Python six.moves.urllib.parse 模块,urlunparse() 实例源码


项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,

    return urlparse.urlunparse(url_parts)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def validate_link(self, link, bookmark=False):
        """Checks if the given link can get correct data."""
        # removes the scheme and net location parts of the link
        url_parts = list(urlparse.urlparse(link))
        url_parts[0] = url_parts[1] = ''

        # bookmark link should not have the version in the URL
        if bookmark and url_parts[2].startswith(PATH_PREFIX):
            return False

        full_path = urlparse.urlunparse(url_parts)
            self.get_json(full_path, path_prefix='')
            return True
        except Exception:
            return False
项目:portia2code    作者:scrapinghub    | 项目源码 | 文件源码
def extract_image_url(text):
    text = _strip_url(text)
    imgurl = None
    if text:
        # check if the text is style content
        match =
        text = match.groups()[0] if match else text
        parsed = urlparse(text)
        path = None
        match =
        if match:
            path =
        elif parsed.query:
            match =
            if match:
                path =
        if path is not None:
            parsed = list(parsed)
            parsed[2] = path
            imgurl = urlunparse(parsed)
        if not imgurl:
            imgurl = text
    return imgurl
项目:deb-oslo.vmware    作者:openstack    | 项目源码 | 文件源码
def _fix_esx_url(url, host, port):
        """Fix netloc in the case of an ESX host.

        In the case of an ESX host, the netloc is set to '*' in the URL
        returned in HttpNfcLeaseInfo. It should be replaced with host name
        or IP address.
        urlp = urlparse.urlparse(url)
        if urlp.netloc == '*':
            scheme, netloc, path, params, query, fragment = urlp
            if netutils.is_valid_ipv6(host):
                netloc = '[%s]:%d' % (host, port)
                netloc = "%s:%d" % (host, port)
            url = urlparse.urlunparse((scheme,
        return url
项目:myautotest    作者:auuppp    | 项目源码 | 文件源码
def _decorate_request(self, filters, method, url, headers=None, body=None,
        if auth_data is None:
            auth_data = self.auth_data
        token, _ = auth_data
        #base_url = self.base_url(filters=filters, auth_data=auth_data)
        base_url = url
        # build authenticated request
        # returns new request, it does not touch the original values
        _headers = copy.deepcopy(headers) if headers is not None else {}
        _headers['X-Auth-Token'] = str(token)
        if url is None or url == "":
            _url = base_url
            # Join base URL and url, and remove multiple contiguous slashes
            _url = "/".join([base_url, url])
            parts = [x for x in urlparse.urlparse(_url)]
            parts[2] = re.sub("/{2,}", "/", parts[2])
            _url = urlparse.urlunparse(parts)
        # no change to method or body
        return str(_url), _headers, body
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def config():
    answer = input("Please enter the URL for your custom Quilt registry (ask your administrator),\n" +
                   "or leave this line blank to use the default registry: ")
    if answer:
        url = urlparse(answer.rstrip('/'))
        if (url.scheme not in ['http', 'https'] or not url.netloc or
            url.path or url.params or url.query or url.fragment):
            raise CommandException("Invalid URL: %s" % answer)
        canonical_url = urlunparse(url)
        # When saving the config, store '' instead of the actual URL in case we ever change it.
        canonical_url = ''

    cfg = _load_config()
    cfg['registry_url'] = canonical_url

    # Clear the cached URL.
    global _registry_url
    _registry_url = None
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def ticket_request_url(
        url, fmt=None, reference_name=None, reference_md5=None,
        start=None, end=None, fields=None, tags=None, notags=None,
    parsed_url = urlparse(url)
    get_vars = parse_qs(parsed_url.query)
    # TODO error checking
    if reference_name is not None:
        get_vars["referenceName"] = reference_name
    if reference_md5 is not None:
        get_vars["referenceMD5"] = reference_md5
    if start is not None:
        get_vars["start"] = int(start)
    if end is not None:
        get_vars["end"] = int(end)
    if data_format is not None:
        get_vars["format"] = data_format.upper()
    # if fields is not None:
    #     get_vars["fields"] = ",".join(fields)
    # if tags is not None:
    #     get_vars["tags"] = ",".join(tags)
    # if notags is not None:
    #     get_vars["notags"] = ",".join(notags)
    new_url = list(parsed_url)
    new_url[4] = urlencode(get_vars, doseq=True)
    return urlunparse(new_url)
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def run(self):
        self.data_format = self.ticket.get("format", "BAM")
        self.md5 = self.ticket.get("md5", None)
        for url_object in self.ticket["urls"]:
            url = urlparse(url_object["url"])
            if url.scheme.startswith("http"):
                headers = url_object.get("headers", "")
                self.__retry(self._handle_http_url, urlunparse(url), headers)
            elif url.scheme == "data":
                raise ValueError("Unsupported URL scheme:{}".format(url.scheme))
项目:panko    作者:openstack    | 项目源码 | 文件源码
def __init__(self, url, conf):
        db_name = 'panko_%s' % uuidutils.generate_uuid(dashed=False)
        engine = sqlalchemy.create_engine(url)
        conn = engine.connect()
        self._create_database(conn, db_name)
        parsed = list(urlparse.urlparse(url))
        parsed[2] = '/' + db_name
        self.url = urlparse.urlunparse(parsed)
        self.conf = conf
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def iter_json_batches(inputs, base_url, batch_size, keep_instance_path):
    parsed_base_url = urlparse(base_url)

    current_uri = None
    current_batch = []

    for href, resource in inputs:
        # Skip over links-only (discovery) resources
        if resource.keys() == ["_links"]:

        # Inject the base URL's scheme and netloc; `urljoin` should do exactly this operation,
        # but actually won't if the right-hand-side term defines its own netloc
        parsed_href = urlparse(href)
        uri = urlunparse(parsed_href._replace(

        if batch_size == 1:
            yield (uri, [resource])
            # batch handling
            if keep_instance_path:
                collection_uri = uri.rsplit("?", 1)[0]
                collection_uri = uri.rsplit("/", 1)[0]

            if any((
                    current_uri is not None and current_uri != collection_uri,
                    len(current_batch) >= batch_size,
                yield (current_uri, current_batch)
                current_batch = []

            current_uri = collection_uri

    if current_batch:
        yield (current_uri, current_batch)
项目:python-base    作者:kubernetes-client    | 项目源码 | 文件源码
def get_websocket_url(url):
    parsed_url = urlparse(url)
    parts = list(parsed_url)
    if parsed_url.scheme == 'http':
        parts[0] = 'ws'
    elif parsed_url.scheme == 'https':
        parts[0] = 'wss'
    return urlunparse(parts)
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def _build_conf_providers(self):

        def _schemed_url(uri):
            uri = uri.strip('/')
            return urlparse.urlparse(
                uri if uri.startswith('http') else
                "%s://%s" % (self._http_provider.default_scheme, uri))

        conf_urls = self.nsxlib_config.nsx_api_managers[:]
        urls = []
        providers = []
        provider_index = -1
        for conf_url in conf_urls:
            provider_index += 1
            conf_url = _schemed_url(conf_url)
            if conf_url in urls:
                LOG.warning("'%s' already defined in configuration file. "
                            "Skipping.", urlparse.urlunparse(conf_url))
        return providers
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def get_endpoint_from_parsed_url(parsed_url):
        url_list = [(lambda: x if e < 2 else '')() for e, x in
        return urlparse.urlunparse(url_list)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def _build_url(url, _params):
    """Build the actual URL to use."""

    # Support for unicode domain names and paths.
    scheme, netloc, path, params, query, fragment = urlparse(url)
    netloc = netloc.encode('idna').decode('utf-8')
    if not path:
        path = '/'

    if six.PY2:
        if isinstance(scheme, six.text_type):
            scheme = scheme.encode('utf-8')
        if isinstance(netloc, six.text_type):
            netloc = netloc.encode('utf-8')
        if isinstance(path, six.text_type):
            path = path.encode('utf-8')
        if isinstance(params, six.text_type):
            params = params.encode('utf-8')
        if isinstance(query, six.text_type):
            query = query.encode('utf-8')
        if isinstance(fragment, six.text_type):
            fragment = fragment.encode('utf-8')

    enc_params = _encode_params(_params)
    if enc_params:
        if query:
            query = '%s&%s' % (query, enc_params)
            query = enc_params
    url = (urlunparse([scheme, netloc, path, params, query, fragment]))
    return url
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def create_plugin(self, session, version, url, raw_status=None):
        """Handle default Keystone endpoint configuration

        Build the actual API endpoint from the scheme, host and port of the
        original auth URL and the rest from the returned version URL.

        ver_u = urlparse.urlparse(url)

        # Only hack this if it is the default setting
        if ver_u.netloc.startswith('localhost'):
            auth_u = urlparse.urlparse(self.auth_url)
            # from original auth_url: scheme, netloc
            # from api_url: path, query (basically, the rest)
            url = urlparse.urlunparse((
            LOG.debug('Version URL updated: %s', url)

        return super(OSCGenericPassword, self).create_plugin(
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def relative(self, url_string):
        url_bits = urllib_parse.urlparse(url_string)
        return urllib_parse.urlunparse(('', '', url_bits.path, url_bits.params, url_bits.query, url_bits.fragment))
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def start_requests(self):
        In the scrapy doc there are two ways to tell scrapy where to begin to
        crawl from.  One is start_requests, the other is start_urls which is
        shortcut to the start_requestt.

        Based on my experience, it is better to use start_requests instead of
        start_urls bacause in this methods you can know how the request object
        are created and how request is yield. You should keep it simple and
        try not to use some magic or it might confuse you.

        In this project, you have no need to change code in this method, just
        modify code in parse_entry_page

        If you fully understatnd how scrapy work, then you are free to choose
        between start_requests and start_urls.
        prefix = self.settings["WEB_APP_PREFIX"]
        result = parse.urlparse(prefix)
        base_url = parse.urlunparse(
            (result.scheme, result.netloc, "", "", "", "")
        # Generate start url from config and self.entry, when you paste the code
        # to another spider you can just change self.entry and self.taskid
        url = parse.urljoin(base_url, self.entry)
        request = Request(url=url, callback=self.parse_entry_page)
            'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
            'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
        request.headers['Accept-Encoding'] = 'gzip, deflate, sdch'
        request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
        request.headers['Connection'] = 'keep-alive'
        request.headers['Host'] = ''
        request.headers['DNT'] = 1
        # request.cookies['token'] = '4TO4N49X81'

        yield request
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def start_requests(self):
        In the scrapy doc there are two ways to tell scrapy where to begin to
        crawl from.  One is start_requests, the other is start_urls which is
        shortcut to the start_requestt.

        Based on my experience, it is better to use start_requests instead of
        start_urls bacause in this methods you can know how the request object
        are created and how request is yield. You should keep it simple and
        try not to use some magic or it might confuse you.

        In this project, you have no need to change code in this method, just
        modify code in parse_entry_page

        If you fully understatnd how scrapy work, then you are free to choose
        between start_requests and start_urls.
        prefix = self.settings["WEB_APP_PREFIX"]
        result = parse.urlparse(prefix)
        base_url = parse.urlunparse(
            (result.scheme, result.netloc, "", "", "", "")
        # Generate start url from config and self.entry, when you paste the code
        # to another spider you can just change self.entry and self.taskid
        url = parse.urljoin(base_url, self.entry)
        request = Request(url=url, callback=self.parse_entry_page)
            'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
            'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
        request.headers['Accept-Encoding'] = 'gzip, deflate, sdch'
        request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
        request.headers['Connection'] = 'keep-alive'
        request.headers['Host'] = ''
        request.headers['DNT'] = 1
        yield request
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def start_requests(self):
        In the scrapy doc there are two ways to tell scrapy where to begin to
        crawl from.  One is start_requests, the other is start_urls which is
        shortcut to the start_requestt.

        Based on my experience, it is better to use start_requests instead of
        start_urls bacause in this methods you can know how the request object
        are created and how request is yield. You should keep it simple and
        try not to use some magic or it might confuse you.

        In this project, you have no need to change code in this method, just
        modify code in parse_entry_page

        If you fully understatnd how scrapy work, then you are free to choose
        between start_requests and start_urls.
        prefix = self.settings["WEB_APP_PREFIX"]
        result = parse.urlparse(prefix)
        base_url = parse.urlunparse(
            (result.scheme, result.netloc, "", "", "", "")
        # Generate start url from config and self.entry, when you paste the code
        # to another spider you can just change self.entry and self.taskid
        url = parse.urljoin(base_url, self.entry)
        request = Request(url=url, callback=self.parse_entry_page)
            'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
            'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
        request.headers['Accept-Encoding'] = 'gzip, deflate, sdch'
        request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
        request.headers['Connection'] = 'keep-alive'
        request.headers['Host'] = ''
        request.headers['DNT'] = 1
        yield request
项目:fulmar    作者:tylderen    | 项目源码 | 文件源码
def build_url(url, _params):
    """Build the actual URL to use."""

    # Support for unicode domain names and paths.
    scheme, netloc, path, params, query, fragment = urlparse(url)
    netloc = netloc.encode('idna').decode('utf-8')
    if not path:
        path = '/'

    if six.PY2:
        if isinstance(scheme, six.text_type):
            scheme = scheme.encode('utf-8')
        if isinstance(netloc, six.text_type):
            netloc = netloc.encode('utf-8')
        if isinstance(path, six.text_type):
            path = path.encode('utf-8')
        if isinstance(params, six.text_type):
            params = params.encode('utf-8')
        if isinstance(query, six.text_type):
            query = query.encode('utf-8')
        if isinstance(fragment, six.text_type):
            fragment = fragment.encode('utf-8')

    enc_params = encode_params(_params)
    if enc_params:
        if query:
            query = '%s&%s' % (query, enc_params)
            query = enc_params
    url = (urlunparse([scheme, netloc, path, params, query, fragment]))
    return url
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def _parse_contents(self, response):
        # Wix pages aren't really parseable, so anytime we see them,
        # let's re-run it (depth-1) with an escaped-fragment to get the real html source
        if '' in response.body and '_escaped_fragment_' not in response.url:
            parsed_url = urlparse(response.url)
            qs = parse_qs(parsed_url.query)
            qs['_escaped_fragment_'] = ''
            wix_scrapeable_url = urlunparse(
                (parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, urlencode(qs), parsed_url.fragment)
            response.meta['depth'] -= 1
            return [scrapy.Request(wix_scrapeable_url, self.parse)]

        if not hasattr(response, 'selector'):
  'Skipping unknown file from: %s', response.url)
        # Get all text contents of tags (unless they are script or style tags)
        text_contents = ' '.join(response.selector.xpath('//*[not(self::script|self::style)]/text()').extract()).lower()

        processed_text = event_classifier.StringProcessor(text_contents, regex_keywords.WORD_BOUNDARIES)
        wrong = processed_text.get_tokens(keywords.DANCE_WRONG_STYLE)
        good = processed_text.get_tokens(rules.STREET_STYLE)
        if (wrong or good):
            #print response.url, set(wrong), set(good)
项目:python-percy    作者:teeberg    | 项目源码 | 文件源码
def strip_fragment(self, path):
        result = urlparse(path)
        result = result._replace(fragment='')
        return six.text_type(urlunparse(result))
项目:python-zeep    作者:mvantellingen    | 项目源码 | 文件源码
def url_http_to_https(value):
    parts = urlparse(value)
    if parts.scheme != 'http':
        return value

    # Check if the url contains ':80' and remove it if that is the case
    netloc_parts = parts.netloc.rsplit(':', 1)
    if len(netloc_parts) == 2 and netloc_parts[1] == '80':
        netloc = netloc_parts[0]
        netloc = parts.netloc
    return urlunparse(('https', netloc) + parts[2:])
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def expand_redirect_url(starting_url, key, bucket):
    """ Add key and bucket parameters to starting URL query string. """
    parsed = urlparse.urlparse(starting_url)
    query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))
    query.update([('key', key), ('bucket', bucket)])

    redirect_url = urlparse.urlunparse((
        parsed.scheme, parsed.netloc, parsed.path,
        parsed.params, urlparse.urlencode(query), None))

    return redirect_url
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def get_repo_url(self):
                return urlunparse(("file", "", pathname2url(
                    self.__dir), "", "", ""))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __new__(cls, origin_url, create_repo=False, pkg_name=None,
            repo_props=EmptyDict, trans_id=None, noexecute=False, xport=None,
            pub=None, progtrack=None):

                scheme, netloc, path, params, query, fragment = \
                    urlparse(origin_url, "http", allow_fragments=0)
                scheme = scheme.lower()

                if noexecute:
                        scheme = "null"
                if scheme != "null" and (not xport or not pub):
                        raise TransactionError("Caller must supply transport "
                            "and publisher.")
                if scheme not in cls.__schemes:
                        raise TransactionRepositoryURLError(origin_url,
                if scheme.startswith("http") and not netloc:
                        raise TransactionRepositoryURLError(origin_url,
                if scheme.startswith("file"):
                        if netloc:
                                raise TransactionRepositoryURLError(origin_url,
                                    msg="'{0}' contains host information, which "
                                    "is not supported for filesystem "
                        # as we're urlunparsing below, we need to ensure that
                        # the path starts with only one '/' character, if any
                        # are present
                        if path.startswith("/"):
                                path = "/" + path.lstrip("/")
                        elif not path:
                                raise TransactionRepositoryURLError(origin_url)

                # Rebuild the url with the sanitized components.
                origin_url = urlunparse((scheme, netloc, path, params,
                    query, fragment))

                return cls.__schemes[scheme](origin_url,
                    create_repo=create_repo, pkg_name=pkg_name,
                    repo_props=repo_props, trans_id=trans_id, xport=xport,
                    pub=pub, progtrack=progtrack)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def parse_uri(uri, cwd=None):
        """Parse the repository location provided and attempt to transform it
        into a valid repository URI.

        'cwd' is the working directory to use to turn paths into an absolute
        path.  If not provided, the current working directory is used.

        if uri.find("://") == -1 and not uri.startswith("file:/"):
                # Convert the file path to a URI.
                if not cwd:
                        uri = os.path.abspath(uri)
                elif not os.path.isabs(uri):
                        uri = os.path.normpath(os.path.join(cwd, uri))

                uri = urlunparse(("file", "",
                    pathname2url(uri), "", "", ""))

        scheme, netloc, path, params, query, fragment = \
            urlparse(uri, "file", allow_fragments=0)
        scheme = scheme.lower()

        if scheme == "file":
                # During urlunparsing below, ensure that the path starts with
                # only one '/' character, if any are present.
                if path.startswith("/"):
                        path = "/" + path.lstrip("/")

        # Rebuild the URI with the sanitized components.
        return urlunparse((scheme, netloc, path, params,
            query, fragment))
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def execute(self, http=None, num_retries=0):
    """Execute the request.

      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry 500's with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

      A deserialized object model of the response body as determined
      by the postproc.

      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def execute(self, http=None, num_retries=0):
    """Execute the request.

      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

      A deserialized object model of the response body as determined
      by the postproc.

      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    # Assume that a GET request never contains a request body.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None:
      credentials = _auth.get_credentials_from_http(request.http)
      if credentials is not None:
        _auth.apply_credentials(credentials, headers)

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:panko    作者:openstack    | 项目源码 | 文件源码
def start_fixture(self):
        """Set up config."""

        global LOAD_APP_KWARGS

        self.conf = None

        # Determine the database connection.
        db_url = os.environ.get('PIFPAF_URL', "sqlite://").replace(
            "mysql://", "mysql+pymysql://")
        if not db_url:
            raise case.SkipTest('No database connection configured')

        conf = self.conf = service.prepare_service([], [])

        content = ('{"default": ""}')
        if six.PY3:
            content = content.encode('utf-8')
        self.tempfile = fileutils.write_to_tempfile(content=content,

        conf.set_override("policy_file", self.tempfile,

        parsed_url = list(urlparse.urlparse(db_url))
        parsed_url[2] += '-%s' % uuidutils.generate_uuid(dashed=False)
        db_url = urlparse.urlunparse(parsed_url)

        conf.set_override('connection', db_url, group='database')

        if (parsed_url[0].startswith("mysql")
           or parsed_url[0].startswith("postgresql")):

        self.conn = storage.get_connection_from_config(self.conf)

        LOAD_APP_KWARGS = {
            'conf': conf, 'appname': 'panko+noauth',
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def url_for(self, resource='', query=None, params='', fragment='',
        """Generate url for resource

        Use endpoint for generation

        Ex. resource = 'one/two/three'
            result - http://localhost:5000/api/one/two/three/
            if endpoint == http://localhost:5000/api/

        :param resource: str
        :param query: dict for generate query string {a: 1, b: 2} -> ?a=1&b=2, or string
        :param params: params for last path url
        :param fragment: #fragment
        :return: str, url
        parsed_url = list(urlparse.urlparse(self.endpoint))
        if resource:
            path = self.path + '/' + resource
            path = self.path
        if self.close_slash:
            if not path.endswith('/'):
                path += '/'
        if not params:
            params = self.params
        if not fragment:
            fragment = self.fragment
        parsed_url[2] = path
        parsed_url[3] = params
        parsed_url[5] = fragment
        if self.query:
            parsed_url[4] = urlencode(self.query, doseq=1)
        if query is not None:
            if keep_blank_values is None:
                keep_blank_values = self.keep_blank_values
            if isinstance(query, six.string_types):
                query = urlparse.parse_qs(query,
            req_query = dict(self.query)
            req_query = urlencode(req_query, doseq=1)
            parsed_url[4] = req_query
        url = urlparse.urlunparse(parsed_url)
        self.logger.debug('Url %s built for resource "%s"', url, resource)
        return url
项目:django-rangepaginator    作者:mvantellingen    | 项目源码 | 文件源码
def paginate(page=None, request=None, distance=2, edge=1, extra_class='',
    paginator = page.paginator
    pages = calculate_pages(
        page.number, paginator.num_pages, distance=distance, edge=edge)

    prev_page_url = next_page_url = None
    result = []
    if request:
        parts = urlparse(request.get_full_path())
        params = parse_qs(parts.query)
        for page_num in pages:
            if not page_num:
                result.append((page_num, None))
                params['page'] = [str(page_num)]
                query = urlencode(params, doseq=True)
                url = urlunparse(parts[:4] + (query,) + parts[5:])
                result.append((page_num, url))

        if page.has_previous():
            params['page'] = page.previous_page_number()
            query = urlencode(params, doseq=True)
            prev_page_url = urlunparse(parts[:4] + (query,) + parts[5:])

        if page.has_next():
            params['page'] = page.next_page_number()
            query = urlencode(params, doseq=True)
            next_page_url = urlunparse(parts[:4] + (query,) + parts[5:])

        for page_num in pages:
            url = '?%s' % urlencode({'page': str(page_num)})
            result.append((page_num, url))

        if page.has_previous():
            prev_page_url = '?%s' % urlencode({
                'page': str(page.previous_page_number())

        if page.has_next():
            next_page_url = '?%s' % urlencode({
                'page': str(page.next_page_number())

    pages = result

    return {
        'current': page.number,
        'page': page,
        'pages': pages,
        'paginator': paginator,
        'next_page_url': next_page_url,
        'prev_page_url': prev_page_url,
        'extra_class': extra_class,
        'text_labels': text_labels,
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def _list_pagination(self, url, response_key=None, obj_class=None,
        """Retrieve a list of items.

        The Zun API is configured to return a maximum number of
        items per request, (FIXME: see Zun's api.max_limit option). This
        iterates over the 'next' link (pagination) in the responses,
        to get the number of items specified by 'limit'. If 'limit'
        is None this function will continue pagination until there are
        no more values to be returned.

        :param url: a partial URL, e.g. '/nodes'
        :param response_key: the key to be looked up in response
            dictionary, e.g. 'nodes'
        :param obj_class: class for constructing the returned objects.
        :param limit: maximum number of items to return. If None returns

        if obj_class is None:
            obj_class = self.resource_class

        if limit is not None:
            limit = int(limit)

        object_list = []
        object_count = 0
        limit_reached = False
        while url:
            resp, body = self.api.json_request('GET', url)
            data = self._format_body_data(body, response_key)
            for obj in data:
                object_list.append(obj_class(self, obj, loaded=True))
                object_count += 1
                if limit and object_count >= limit:
                    # break the for loop
                    limit_reached = True

            # break the while loop and return
            if limit_reached:

            url = body.get('next')
            if url:
                # NOTE(lucasagomes): We need to edit the URL to remove
                # the scheme and netloc
                url_parts = list(urlparse.urlparse(url))
                url_parts[0] = url_parts[1] = ''
                url = urlparse.urlunparse(url_parts)

        return object_list
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def execute(self, http=None, num_retries=0):
    """Execute the request.

      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

      A deserialized object model of the response body as determined
      by the postproc.

      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def temp_url(self, lifetime=300, method='GET', inline=True, filename=None):
        """Obtains a temporary URL to an object.

            lifetime (int): The time (in seconds) the temporary
                URL will be valid
            method (str): The HTTP method that can be used on
                the temporary URL
            inline (bool, default True): If False, URL will have a
                Content-Disposition header that causes browser to download as
            filename (str, optional): A urlencoded filename to use for
                attachment, otherwise defaults to object name
        global_options = settings.get()['swift']
        auth_url = global_options.get('auth_url')
        temp_url_key = global_options.get('temp_url_key')

        if not self.resource:
            raise ValueError('can only create temporary URL on object')
        if not temp_url_key:
            raise ValueError(
                'a temporary url key must be set with settings.update '
                'or by setting the OS_TEMP_URL_KEY environment variable')
        if not auth_url:
            raise ValueError(
                'an auth url must be set with settings.update '
                'or by setting the OS_AUTH_URL environment variable')

        obj_path = '/v1/%s' % self[len(]
        # Generate the temp url using swifts helper. Note that this method is ONLY
        # useful for obtaining the temp_url_sig and the temp_url_expires parameters.
        # These parameters will be used to construct a properly-escaped temp url
        obj_url = generate_temp_url(obj_path, lifetime, temp_url_key, method)
        query_begin = obj_url.rfind('temp_url_sig', 0, len(obj_url))
        obj_url_query = obj_url[query_begin:]
        obj_url_query = dict(parse.parse_qsl(obj_url_query))

        query = ['temp_url_sig=%s' % obj_url_query['temp_url_sig'],
                 'temp_url_expires=%s' % obj_url_query['temp_url_expires']]
        if inline:
        if filename:
            query.append('filename=%s' % parse.quote(filename))

        auth_url_parts = parse.urlparse(auth_url)
        return parse.urlunparse((auth_url_parts.scheme,
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:alfredToday    作者:jeeftor    | 项目源码 | 文件源码
def execute(self, http=None, num_retries=0):
    """Execute the request.

      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry 500's with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

      A deserialized object model of the response body as determined
      by the postproc.

      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
项目:alfredToday    作者:jeeftor    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:Webradio_v2    作者:Acer54    | 项目源码 | 文件源码
def execute(self, http=None, num_retries=0):
    """Execute the request.

      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

      A deserialized object model of the response body as determined
      by the postproc.

      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
项目:Webradio_v2    作者:Acer54    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:GAMADV-X    作者:taers232c    | 项目源码 | 文件源码
def execute(self, http=None, num_retries=0):
    """Execute the request.

      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

      A deserialized object model of the response body as determined
      by the postproc.

      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    # Assume that a GET request never contains a request body.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
项目:GAMADV-X    作者:taers232c    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None:
      credentials = _auth.get_credentials_from_http(request.http)
      if credentials is not None:
        _auth.apply_credentials(credentials, headers)

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
项目:python-iotronicclient    作者:openstack    | 项目源码 | 文件源码
def _list_pagination(self, url, response_key=None, obj_class=None,
        """Retrieve a list of items.

        The Iotronic API is configured to return a maximum number of
        items per request, (see Iotronic's api.max_limit option). This
        iterates over the 'next' link (pagination) in the responses,
        to get the number of items specified by 'limit'. If 'limit'
        is None this function will continue pagination until there are
        no more values to be returned.

        :param url: a partial URL, e.g. '/boards'
        :param response_key: the key to be looked up in response
            dictionary, e.g. 'boards'
        :param obj_class: class for constructing the returned objects.
        :param limit: maximum number of items to return. If None returns

        if obj_class is None:
            obj_class = self.resource_class

        if limit is not None:
            limit = int(limit)

        object_list = []
        object_count = 0
        limit_reached = False
        while url:
            resp, body = self.api.json_request('GET', url)
            data = self._format_body_data(body, response_key)
            for obj in data:
                object_list.append(obj_class(self, obj, loaded=True))
                object_count += 1
                if limit and object_count >= limit:
                    # break the for loop
                    limit_reached = True

            # break the while loop and return
            if limit_reached:

            url = body.get('next')
            if url:
                # NOTE(lucasagomes): We need to edit the URL to remove
                # the scheme and netloc
                url_parts = list(urlparse.urlparse(url))
                url_parts[0] = url_parts[1] = ''
                url = urlparse.urlunparse(url_parts)

        return object_list
项目:share-class    作者:junyiacademy    | 项目源码 | 文件源码
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

      request: HttpRequest, the request to serialize.

      The request as a string in application/http format.
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc

    if request.body is not None:
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body