Python six.moves.urllib.parse 模块,quote() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用six.moves.urllib.parse.quote()

项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def encode(data, mime_type='', charset='utf-8', base64=True):
    """
    Encode data to DataURL
    """
    if isinstance(data, six.text_type):
        data = data.encode(charset)
    else:
        charset = None
    if base64:
        data = utils.text(b64encode(data))
    else:
        data = utils.text(quote(data))

    result = ['data:', ]
    if mime_type:
        result.append(mime_type)
    if charset:
        result.append(';charset=')
        result.append(charset)
    if base64:
        result.append(';base64')
    result.append(',')
    result.append(data)

    return ''.join(result)
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_encoding_rdf():
    # With encoding specified
    encoding = "ISO-8859-1"
    csvw = CSVW(csv_path="./tests/iso_encoding.csv",
                metadata_path="./tests/iso_encoding.csv-metadata.json",
                csv_encoding=encoding)
    rdf_output = csvw.to_rdf()
    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    units = Namespace('http://example.org/units/')
    cars = Namespace('http://example.org/cars/')
    meta = Namespace("http://example.org/properties/")

    expected_unit = units[quote(u"\xb5100".encode('utf-8'))]
    assert (cars['1'], meta['UnitOfMeasurement'], expected_unit) in g
    assert expected_unit in list(g.objects())
项目:python-scaleioclient    作者:codedellemc    | 项目源码 | 文件源码
def encode_string(value, double=False):
    """
    Url encode a string to ASCII in order to escape any characters not
     allowed :, /, ?, #, &, =. If parameter 'double=True' perform two passes
     of encoding which may be required for some REST api endpoints. This is
     usually done to remove any additional special characters produce by the
     single encoding pass.
    :param value: Value to encode
    :param double: Double encode string
    :return:
    """

    # Replace special characters in string using the %xx escape
    encoded_str = quote(value, '')
    if double:  # double encode
        encoded_str = quote(encoded_str, '')

    return encoded_str
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def get(self, stack_id, resource_name, with_attr=None):
        """Get the details for a specific resource.

        :param stack_id: ID of stack containing the resource
        :param resource_name: ID of resource to get the details for
        :param with_attr: Attributes to show
        """
        stack_id = self._resolve_stack_id(stack_id)
        url_str = '/stacks/%s/resources/%s' % (
                  parse.quote(stack_id, ''),
                  parse.quote(encodeutils.safe_encode(resource_name), ''))
        if with_attr:
            params = {'with_attr': with_attr}
            url_str += '?%s' % parse.urlencode(params, True)

        resp = self.client.get(url_str)
        body = utils.get_response_body(resp)
        return Resource(self, body.get('resource'))
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def mark_unhealthy(self, stack_id, resource_name,
                       mark_unhealthy, resource_status_reason):
        """Mark a resource as healthy or unhealthy.

        :param stack_id: ID of stack containing the resource
        :param resource_name: ID of resource
        :param mark_unhealthy: Mark resource unhealthy if set to True
        :param resource_status_reason: Reason for resource status change.
        """
        stack_id = self._resolve_stack_id(stack_id)
        url_str = '/stacks/%s/resources/%s' % (
                  parse.quote(stack_id, ''),
                  parse.quote(encodeutils.safe_encode(resource_name), ''))
        resp = self.client.patch(
            url_str,
            data={"mark_unhealthy": mark_unhealthy,
                  "resource_status_reason": resource_status_reason})

        body = utils.get_response_body(resp)
        return body
项目:weatherlink-python    作者:beamerblvd    | 项目源码 | 文件源码
def _send_update(self, url, attribute_map, attribute_calculations, record):
        d = record.date.replace(tzinfo=self.station_time_zone).astimezone(pytz.UTC).replace(tzinfo=None)

        url = '%s&ID=%s&PASSWORD=%s&dateutc=%s' % (
            url,
            self.station_id,
            url_quote(self.password, safe=''),
            url_quote(d.strftime('%Y-%m-%d %H:%M:%S'), safe=''),
        )

        for parameter, field in attribute_map or []:
            if field and record[field] is not None:
                url += '&%s=%s' % (parameter, url_quote(record[field], safe=''), )

        for parameter, function in attribute_calculations or []:
            if function:
                value = function(self, record)
                if value is not None:
                    url += '&%s=%s' % (parameter, url_quote(value, safe=''), )

        response = self._session.get(url)

        assert 200 <= response.status_code < 300, 'Status code %s unexpected' % response.status_code

        assert response.text == 'success', 'Response "%s" unexpected' % response.text
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def code(item):
    """
    Turn a NameID class instance into a quoted string of comma separated
    attribute,value pairs. The attribute names are replaced with digits.
    Depends on knowledge on the specific order of the attributes for the
    class that is used.

    :param item: The class instance
    :return: A quoted string
    """
    _res = []
    i = 0
    for attr in ATTR:
        val = getattr(item, attr)
        if val:
            _res.append("%d=%s" % (i, quote(val)))
        i += 1
    return ",".join(_res)
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:open-mic    作者:cosmir    | 项目源码 | 文件源码
def check_connection(default='http://google.com', timeout=1):
    """Test the internet connection.

    Parameters
    ----------
    default : str
        URL to test; defaults to a Google IP address.

    timeout : number
        Time in seconds to wait before giving up.

    Returns
    -------
    success : bool
        True if appears to be online, else False
    """
    success = True
    try:
        surl = urlparse.quote(default, safe=':./')
        urlrequest.urlopen(surl, timeout=timeout)
    except urlerror.URLError as derp:
        success = False
        logger.debug("Network unreachable: {}".format(derp))
    return success
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def get_local_path(self, img, pfmri):
                """Return an opener for the license text from the local disk or
                None if the data for the text is not on-disk."""

                if img.version <= 3:
                        # Older images stored licenses without accounting for
                        # '/', spaces, etc. properly.
                        path = os.path.join(img.get_license_dir(pfmri),
                            "license." + self.attrs["license"])
                else:
                        # Newer images ensure licenses are stored with encoded
                        # name so that '/', spaces, etc. are properly handled.
                        path = os.path.join(img.get_license_dir(pfmri),
                            "license." + quote(self.attrs["license"],
                            ""))
                return path
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def abandon_0(self, *tokens):
                """Aborts an in-flight transaction for the Transaction ID
                specified in the request path.  Returns no output."""

                try:
                        # cherrypy decoded it, but we actually need it encoded.
                        trans_id = quote(tokens[0], "")
                except IndexError:
                        trans_id = None

                try:
                        self.repo.abandon(trans_id)
                except srepo.RepositoryError as e:
                        # Assume a bad request was made.  A 404 can't be
                        # returned here as misc.versioned_urlopen will interpret
                        # that to mean that the server doesn't support this
                        # operation.
                        raise cherrypy.HTTPError(http_client.BAD_REQUEST, str(e))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def manifest(self, *tokens):
                """Manifest requests coming from the BUI need to be redirected
                back through the RewriteRules defined in the Apache
                configuration in order to be served directly.
                pkg(1) will never hit this code, as those requests don't get
                handled by this webapp.
                """

                self.setup(cherrypy.request)
                rel_uri = cherrypy.request.path_info

                # we need to recover the escaped portions of the URI
                redir = rel_uri.lstrip("/").split("/")
                pub_mf = "/".join(redir[0:4])
                pkg_name = "/".join(redir[4:])
                # encode the URI so our RewriteRules can process them
                pkg_name = quote(pkg_name)
                pkg_name = pkg_name.replace("/", "%2F")
                pkg_name = pkg_name.replace("%40", "@", 1)

                # build a URI that we can redirect to
                redir = "{0}/{1}".format(pub_mf, pkg_name)
                redir = "/{0}".format(redir.lstrip("/"))
                raise cherrypy.HTTPRedirect(redir)
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_project(self, repo):
        return self.request('GET', '/projects/{}'.format(quote(repo, safe='')))
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_issue(self, repo, issue_id):
        try:
            return self.request(
                'GET',
                '/projects/{}/issues'.format(
                    quote(repo, safe=''),
                ),
                params={
                    # XXX(dcramer): this is an undocumented API
                    'iid': issue_id,
                }
            )[0]
        except IndexError:
            raise ApiError('Issue not found with ID', 404)
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def create_issue(self, repo, data):
        return self.request(
            'POST',
            '/projects/{}/issues'.format(quote(repo, safe='')),
            data=data,
        )
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def create_note(self, repo, global_issue_id, data):
        return self.request(
            'POST',
            '/projects/{}/issues/{}/notes'.format(
                quote(repo, safe=''),
                global_issue_id,
            ),
            data=data,
        )
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def list_project_members(self, repo):
        return self.request(
            'GET',
            '/projects/{}/members'.format(quote(repo, safe='')),
        )
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def query_single(self, object_type, url_params, query_params=None):
        # type: (str, list, dict) -> dict
        """
        Query for a single object.
        :param object_type: string query type (e.g., "users" or "groups")
        :param url_params: required list of strings to provide as additional URL components
        :param query_params: optional dictionary of query options
        :return: the found object (a dictionary), which is empty if none were found
        """
        # Server API convention (v2) is that the pluralized object type goes into the endpoint
        # but the object type is the key in the response dictionary for the returned object.
        self.local_status["single-query-count"] += 1
        query_type = object_type + "s"  # poor man's plural
        query_path = "/organizations/{}/{}".format(self.org_id, query_type)
        for component in url_params if url_params else []:
            query_path += "/" + urlparse.quote(component, safe='/@')
        if query_params: query_path += "?" + urlparse.urlencode(query_params)
        try:
            result = self.make_call(query_path)
            body = result.json()
        except RequestError as re:
            if re.result.status_code == 404:
                if self.logger: self.logger.debug("Ran %s query: %s %s (0 found)",
                                                  object_type, url_params, query_params)
                return {}
            else:
                raise re
        if body.get("result") == "success":
            value = body.get(object_type, {})
            if self.logger: self.logger.debug("Ran %s query: %s %s (1 found)", object_type, url_params, query_params)
            return value
        else:
            raise ClientError("OK status but no 'success' result", result)
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def query_multiple(self, object_type, page=0, url_params=None, query_params=None):
        # type: (str, int, list, dict) -> tuple
        """
        Query for a page of objects.  Defaults to the (0-based) first page.
        Sadly, the sort order is undetermined.
        :param object_type: string query type (e.g., "users" or "groups")
        :param page: numeric page (0-based) of results to get (up to 200 in a page)
        :param url_params: optional list of strings to provide as additional URL components
        :param query_params: optional dictionary of query options
        :return: tuple (list of returned dictionaries (one for each query result), bool for whether this is last page)
        """
        # Server API convention (v2) is that the pluralized object type goes into the endpoint
        # and is also the key in the response dictionary for the returned objects.
        self.local_status["multiple-query-count"] += 1
        query_type = object_type + "s"  # poor man's plural
        query_path = "/{}/{}/{:d}".format(query_type, self.org_id, page)
        for component in url_params if url_params else []:
            query_path += "/" + urlparse.quote(component)
        if query_params: query_path += "?" + urlparse.urlencode(query_params)
        try:
            result = self.make_call(query_path)
            body = result.json()
        except RequestError as re:
            if re.result.status_code == 404:
                if self.logger: self.logger.debug("Ran %s query: %s %s (0 found)",
                                                  object_type, url_params, query_params)
                return [], True
            else:
                raise re
        if body.get("result") == "success":
            values = body.get(query_type, [])
            last_page = body.get("lastPage", False)
            if self.logger: self.logger.debug("Ran multi-%s query: %s %s (page %d: %d found)",
                                              object_type, url_params, query_params, page, len(values))
            return values, last_page
        else:
            raise ClientError("OK status but no 'success' result", result)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def __extract_metadata(self, doc, payload):
    filename = os.path.basename(doc.path)
    headers = {
      'Accept': 'application/json',
      'Content-Disposition': 'attachment; filename=%s' % url_quote(filename)
    }

    if doc.meta['Content-Type']:
      headers['Content-Type'] = doc.meta['Content-Type']

    tika_url = self.config.get(helper.TIKA_META)
    connection = self.config[helper.INJECTOR].get_http_connection(tika_url)
    payload.seek(0)
    connection.request('PUT', '/meta', payload.read(), headers)
    payload.seek(0)

    response = connection.getresponse()

    try:
      if response.status >= 400:
        logging.error('tika error %d (%s): %s', response.status,
                      response.reason, doc.path)
        return {}
      response_data = response.read()
    finally:
      response.close()

    try:
      result = json.loads(response_data.decode('utf-8'))
    except (ValueError, UnicodeDecodeError):
      logging.error('invalid response from tika for %s', doc.path)
      result = {}

    return result
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _id_to_header(self, id_):
    """Convert an id to a Content-ID header value.

    Args:
      id_: string, identifier of individual request.

    Returns:
      A Content-ID header with the id_ encoded into it. A UUID is prepended to
      the value because Content-ID headers are supposed to be universally
      unique.
    """
    if self._base_id is None:
      self._base_id = uuid.uuid4()

    return '<%s+%s>' % (self._base_id, quote(id_))
项目:h1-python    作者:uber-common    | 项目源码 | 文件源码
def encode_params(obj, keys=()):
    if isinstance(obj, dict):
        # key[bar]=1&key[baz]=2
        return "&".join(encode_params(val, keys + (key,)) for key, val in obj.items())
    # key[foo][]=1&key[foo][]=2
    elif isinstance(obj, (list, tuple)):
        return "&".join(encode_params(val, keys + ("",)) for val in obj)
    # `obj` is just a value, key=1
    else:
        encoded_keys = ""
        for depth, key in enumerate(keys):
            # All keys but top-level keys should be in brackets, i.e.
            # `key[foo]=1`, not `[key][foo]=1`
            encoded_keys += key if depth == 0 else "[" + key + "]"
        return quote(encoded_keys) + "=" + quote(obj)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def swift_get_container(request, container_name, with_data=True):
    if with_data:
        headers, data = swift_api(request).get_object(container_name, "")
    else:
        data = None
        headers = swift_api(request).head_container(container_name)
    timestamp = None
    is_public = False
    public_url = None
    try:
        is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '')
        if is_public:
            swift_endpoint = base.url_for(request,
                                          'object-store',
                                          endpoint_type='publicURL')
            parameters = urlparse.quote(container_name.encode('utf8'))
            public_url = swift_endpoint + '/' + parameters
        ts_float = float(headers.get('x-timestamp'))
        timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
    except Exception:
        pass
    container_info = {
        'name': container_name,
        'container_object_count': headers.get('x-container-object-count'),
        'container_bytes_used': headers.get('x-container-bytes-used'),
        'timestamp': timestamp,
        'data': data,
        'is_public': is_public,
        'public_url': public_url,
    }
    return Container(container_info)
项目:python-pankoclient    作者:openstack    | 项目源码 | 文件源码
def get_pagination_options(limit=None, marker=None, sorts=None):
    options = []
    if limit:
        options.append("limit=%d" % limit)
    if marker:
        options.append("marker=%s" % urllib_parse.quote(marker))
    for sort in sorts or []:
        options.append("sort=%s" % urllib_parse.quote(sort))
    return "&".join(options)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def oauth2_authorize(request):
    """ View to start the OAuth2 Authorization flow.

     This view starts the OAuth2 authorization flow. If scopes is passed in
     as a  GET URL parameter, it will authorize those scopes, otherwise the
     default scopes specified in settings. The return_url can also be
     specified as a GET parameter, otherwise the referer header will be
     checked, and if that isn't found it will return to the root path.

    Args:
       request: The Django request object.

    Returns:
         A redirect to Google OAuth2 Authorization.
    """
    return_url = request.GET.get('return_url', None)
    if not return_url:
        return_url = request.META.get('HTTP_REFERER', '/')

    scopes = request.GET.getlist('scopes', django_util.oauth2_settings.scopes)
    # Model storage (but not session storage) requires a logged in user
    if django_util.oauth2_settings.storage_model:
        if not request.user.is_authenticated():
            return redirect('{0}?next={1}'.format(
                settings.LOGIN_URL, parse.quote(request.get_full_path())))
        # This checks for the case where we ended up here because of a logged
        # out user but we had credentials for it in the first place
        else:
            user_oauth = django_util.UserOAuth2(request, scopes, return_url)
            if user_oauth.has_credentials():
                return redirect(return_url)

    flow = _make_flow(request=request, scopes=scopes, return_url=return_url)
    auth_url = flow.step1_get_authorize_url()
    return shortcuts.redirect(auth_url)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _id_to_header(self, id_):
    """Convert an id to a Content-ID header value.

    Args:
      id_: string, identifier of individual request.

    Returns:
      A Content-ID header with the id_ encoded into it. A UUID is prepended to
      the value because Content-ID headers are supposed to be universally
      unique.
    """
    if self._base_id is None:
      self._base_id = uuid.uuid4()

    return '<%s+%s>' % (self._base_id, quote(id_))
项目:zcp    作者:apolloliu    | 项目源码 | 文件源码
def quote_key(key, reverse=False):
    """Prepare key for storage data in MongoDB.

    :param key: key that should be quoted
    :param reverse: boolean, True --- if we need a reverse order of the keys
                    parts
    :return: iter of quoted part of the key
    """
    r = -1 if reverse else 1

    for k in key.split('.')[::r]:
        if k.startswith('$'):
            k = parse.quote(k)
        yield k
项目:zcp    作者:apolloliu    | 项目源码 | 文件源码
def improve_keys(data, metaquery=False):
    """Improves keys in dict if they contained '.' or started with '$'.

    :param data: is a dictionary where keys need to be checked and improved
    :param metaquery: boolean, if True dots are not escaped from the keys
    :return: improved dictionary if keys contained dots or started with '$':
            {'a.b': 'v'} -> {'a': {'b': 'v'}}
            {'$ab': 'v'} -> {'%24ab': 'v'}
    """
    if not isinstance(data, dict):
        return data

    if metaquery:
        for key in six.iterkeys(data):
            if '.$' in key:
                key_list = []
                for k in quote_key(key):
                    key_list.append(k)
                new_key = '.'.join(key_list)
                data[new_key] = data.pop(key)
    else:
        for key, value in data.items():
            if isinstance(value, dict):
                improve_keys(value)
            if '.' in key:
                new_dict = {}
                for k in quote_key(key, reverse=True):
                    new = {}
                    new[k] = new_dict if new_dict else data.pop(key)
                    new_dict = new
                data.update(new_dict)
            else:
                if key.startswith('$'):
                    new_key = parse.quote(key)
                    data[new_key] = data.pop(key)
    return data
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def get(self, user_id, return_request_id=None):
        """Get the details for a specific user.

        :param user_id: ID of the user
        :param return_request_id: If an empty list is provided, populate this
                              list with the request ID value from the header
                              x-openstack-request-id
        """
        resp, body = self.client.get('/users/%s' % parse.quote(str(user_id)))
        if return_request_id is not None:
            return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
        data = body.get('user')
        return self.resource_class(self, data, loaded=True)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def action(self, user_id, **kwargs):
        """Perform specified action on user.

        :param user_id: ID of the user
        """
        url = '/users/%s/action' % parse.quote(str(user_id))
        return_request_id = kwargs.pop('return_req_id', None)
        resp, body = self.client.post(url, data=kwargs)
        if return_request_id is not None:
            return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
        return self.resource_class(self, body.get('user'), loaded=True)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def get(self, policy_id):
        """Get a specific policy."""
        url = '/policies/%s' % parse.quote(str(policy_id))
        resq, body = self.client.get(url)
        return self.resource_class(self, body.get('policy'), loaded=True)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def action(self, policy_id, **kwargs):
        """Perform specified action on a policy."""
        url = '/policies/%s/action' % parse.quote(str(policy_id))
        resq, body = self.client.post(url, data=kwargs)
        return self.resource_class(self, body.get('policy'), loaded=True)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def delete(self, policy_id):
        """Delete a specific policy."""
        return self._delete('/policies/%s' % parse.quote(str(policy_id)))
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def delete(self, rule_id):
        """Delete a specific rule.

        :param rule_id: Id of the rule to delete
        """
        return self._delete('/rules/%s' % parse.quote(str(rule_id)))