Python urllib.parse 模块,urlencode() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib.parse.urlencode()

项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def url_builder(self, endpoint, *, root=None, params=None, url_params=None):
        """Create a URL for the specified endpoint.

        Arguments:
          endpoint (:py:class:`str`): The API endpoint to access.
          root: (:py:class:`str`, optional): The root URL for the
            service API.
          params: (:py:class:`dict`, optional): The values for format
            into the created URL (defaults to ``None``).
          url_params: (:py:class:`dict`, optional): Parameters to add
            to the end of the URL (defaults to ``None``).

        Returns:
          :py:class:`str`: The resulting URL.

        """
        if root is None:
            root = self.ROOT
        return ''.join([
            root,
            endpoint,
            '?' + urlencode(url_params) if url_params else '',
        ]).format(**params or {})
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def existing_tags(target_uri, h):#, doi, text, h):
    params = {
        'limit':200,
        'uri':target_uri,
        'group':h.group,
        'user':h.username,
    }
    query_url = h.query_url_template.format(query=urlencode(params, True))
    obj = h.authenticated_api_query(query_url)
    rows = obj['rows']
    tags = {}
    unresolved_exacts = {}
    for row in rows:
        for tag in row['tags']:
            if tag.startswith('RRID:'):
                tags[tag] = row['id']
            elif tag.startswith('PMID:'):
                tags[tag] = row['id']
            elif tag.startswith('DOI:'):
                tags[tag] = row['id']
            elif tag == 'RRIDCUR:Unresolved':
                unresolved_exacts[row['target'][0]['selector'][0]['exact']] = row['id']
    return tags, unresolved_exacts
项目:docklet    作者:unias    | 项目源码 | 文件源码
def net_billings(self, username, now_bytes_total):
        global monitor_vnodes
        if not username in self.net_lastbillings.keys():
            self.net_lastbillings[username] = 0
        elif int(now_bytes_total/self.bytes_per_beans) < self.net_lastbillings[username]:
            self.net_lastbillings[username] = 0
        diff = int(now_bytes_total/self.bytes_per_beans) - self.net_lastbillings[username]
        if diff > 0:
            auth_key = env.getenv('AUTH_KEY')
            data = {"owner_name":username,"billing":diff, "auth_key":auth_key}
            header = {'Content-Type':'application/x-www-form-urlencoded'}
            http = Http()
            [resp,content] = http.request("http://"+self.master_ip+"/billing/beans/","POST",urlencode(data),headers = header)
            logger.info("response from master:"+content.decode('utf-8'))
        self.net_lastbillings[username] += diff
        monitor_vnodes[username]['net_stats']['net_billings'] = self.net_lastbillings[username]
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def getCsvReport(product_list, startdate, enddate, source_obj):
        print
        print ("Requesting a csv report for the given time period")
        headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'}
        path = "/billing-usage/v1/contractUsageData/csv"

        parameters = {  "reportSources" :[source_obj],
            "products"  :product_list,
                        "startDate"     :startdate,
                        "endDate"       :enddate
                }
        print
        data_string = parse.urlencode({p: json.dumps(parameters[p]) for p in parameters})
        products_result = session.post(parse.urljoin(baseurl,path),data=data_string, headers=headers)
        products_csv = products_result.text
        return products_csv
项目:flask-restler    作者:klen    | 项目源码 | 文件源码
def make_pagination_headers(limit, curpage, total, link_header=True):
    """Return Link Hypermedia Header."""
    lastpage = int(math.ceil(1.0 * total / limit) - 1)
    headers = {'X-Total-Count': str(total), 'X-Limit': str(limit),
               'X-Page-Last': str(lastpage), 'X-Page': str(curpage)}

    if not link_header:
        return headers

    base = "{}?%s".format(request.path)
    links = {}
    links['first'] = base % urlencode(dict(request.args, **{PAGE_ARG: 0}))
    links['last'] = base % urlencode(dict(request.args, **{PAGE_ARG: lastpage}))
    if curpage:
        links['prev'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage - 1}))
    if curpage < lastpage:
        links['next'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage + 1}))

    headers['Link'] = ",".join(['<%s>; rel="%s"' % (v, n) for n, v in links.items()])
    return headers


# pylama:ignore=R0201
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:meg-server    作者:Argonne-National-Laboratory    | 项目源码 | 文件源码
def get_sendgrid_request_message(cfg, keyid, hex, user_email):
    url_prefix = urljoin(
        cfg.config.megserver_hostname_url,
        os.path.join(cfg.config.meg_url_prefix, "revoke")
    )
    params = urlencode([("keyid", keyid), ("token", hex)])
    parsed = list(urlparse(url_prefix))
    parsed[4] = params
    revocation_link = urlunparse(parsed)

    message = Mail()
    message.add_to(user_email)
    message.set_from(cfg.config.sendgrid.from_email)
    message.set_subject(cfg.config.sendgrid.subject)
    message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link))
    return message
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:waybackscraper    作者:abrenaut    | 项目源码 | 文件源码
def list_archive_timestamps(url, min_date, max_date, user_agent):
    """
    List the available archive between min_date and max_date for the given URL
    """
    logger.info('Listing the archives for the url {url}'.format(url=url))

    # Construct the URL used to download the memento list
    parameters = {'url': url,
                  'output': 'json',
                  'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
                  'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
    cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))

    req = Request(cdx_url, None, {'User-Agent': user_agent})
    with urlopen(req) as cdx:
        memento_json = cdx.read().decode("utf-8")

        timestamps = []
        # Ignore the first line which contains column names
        for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
            # Ignore archives with a status code != OK
            if status_code == '200':
                timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))

    return timestamps
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:Emoji-Web    作者:emoji-gen    | 项目源码 | 文件源码
def url_for(
        text,
        font,
        color,
        back_color,
        size_fixed = False,
        align = 'center',
        stretch = True
        ):
    base_url = app.config['SITE_BASE_URL']
    payload = {
            'text': text,
            'font': font,
            'color': color,
            'back_color': back_color,
            'size_fixed': str(size_fixed).lower(),
            'align': align,
            'stretch': str(stretch).lower()
            }
    return urljoin(base_url, 'emoji') + '?' + urlencode(payload)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:YoWhenReady    作者:jnsdrtlf    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def generate_request_url(self, scopes: tuple):
        """Generate OAuth request url.

        :param scopes: github access scopes
                       (https://developer.github.com/v3/oauth/#scopes)
        :param callback: callback to be called when user authorize request
                         (may be coroutine)
        """
        state = base64.b64encode(os.urandom(15)).decode("ascii")
        self._requested_scopes[state] = scopes
        qs = parse.urlencode({
            "client_id": self._client_id,
            "scope": ",".join(scopes),
            "state": state,
        })
        return "%s?%s" % (REQ_ACCESS_URL, qs)
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def test_05multiple_request(self):
        response = self.fetch('/cluster?{}'.format(
            parse.urlencode({'method': 'node_config',
                             'cluster': 'my cluster',
                             'node': server1})),
            headers={'Key': 'new key'})
        self.assertEqual(response.code, 200)
        commands = json.loads(response.body.decode('utf-8'))
        self.assertEqual(len(commands), 1)
        self.assertEqual(commands[0][:30], 'python3 valuation_standalone_m')

        response = self.fetch('/cluster?{}'.format(
            parse.urlencode({'method': 'node_config',
                             'cluster': 'my cluster',
                             'node': server2})),
            headers={'Key': 'new key'})
        self.assertEqual(response.code, 200)
        commands = json.loads(response.body.decode('utf-8'))
        print(response)
        self.assertEqual(commands[0][:27], 'python3 valuation_worker.py')
        self.assertEqual(commands[1][:27], 'python3 valuation_worker.py')
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def cluster_list(self):
        """
        Get the list of all the clusters created by this user

        :return: A dictionary with all the clusters
        """
        arguments = {
            'method': 'clusters_list'
        }
        client = HTTPClient()
        response = client.fetch('{}/cluster?{}'.format(
            self.uri, parse.urlencode(arguments)),
            headers={'Key': self.uk}
        )

        if response.code == 200:
            return json.loads(response.body.decode('utf-8'))
        else:
            raise ValueError(response.body.decode('utf-8'))
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def request(self, cluster_key, configuration):
        """
        As an available resource, pass the configuration to the registry
        and get the commands that have to be run.

        :param cluster_key: Key of the cluster the resource wants to connect to
        :param configuration: String with the configuration of the resource
        :return: List with the commands that have to be run.
        """
        arguments = {
            'method': 'node_config',
            'cluster': cluster_key,
            'node': configuration
        }
        client = HTTPClient()
        response = client.fetch('{}/cluster?{}'.format(
            self.uri, parse.urlencode(arguments)),
            headers={'Key': self.uk}
        )

        if response.code == 200:
            return json.loads(response.body.decode('utf-8'))
        else:
            raise ValueError(response.body.decode('utf-8'))
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def cluster_status(self, cluster_key):
        """
        Get the present status of the cluster

        :param cluster_key: Key that identifies the cluster
        :return: The status of the cluster
        """
        arguments = {
            'method': 'cluster_status',
            'cluster': cluster_key
        }
        client = HTTPClient()
        response = client.fetch('{}/cluster?{}'.format(
            self.uri, parse.urlencode(arguments)),
            headers={'Key': self.uk}
        )

        if response.code == 200:
            if response.body:
                return json.loads(response.body.decode('utf-8'))
            else:
                return "Empty"
        else:
            raise ValueError(response.body.decode('utf-8'))
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def cluster_reset(self, cluster_key):
        """
        Reset the status of a cluster. This means that all the temporary
        information about which resource request is forgotten. This may
        leave configured resources not properly configured, so handle with
        care.

        :param cluster_key:
        :return: Key of the cluster being reset
        """
        arguments = {
            'method': 'cluster_reset',
            'cluster': cluster_key
        }
        client = HTTPClient()
        response = client.fetch('{}/cluster?{}'.format(
            self.uri, parse.urlencode(arguments)),
            headers={'Key': self.uk}
        )

        if response.code == 200:
            return response.body.decode('utf-8')
        else:
            raise ValueError(response.body.decode('utf-8'))
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def request(uri, cluster_key, configuration):
    """
    As an available resource, pass the configuration to the registry
    and get the commands that have to be run.

    :param uri: Address of the Registry
    :param cluster_key: Key of the cluster the resource wants to connect to
    :param configuration: String with the configuration of the resource
    :return: List with the commands that have to be run.
    """
    arguments = {
        'method': 'node_config',
        'cluster': cluster_key,
        'node': configuration
    }
    client = HTTPClient()
    response = client.fetch('{}/cluster?{}'.format(
        uri, parse.urlencode(arguments))
    )

    if response.code == 200:
        return json.loads(response.body.decode('utf-8'))
    else:
        raise ValueError(response.body.decode('utf-8'))
项目:pylm-registry    作者:nfqsolutions    | 项目源码 | 文件源码
def download(self, fr=None, to=None):
        """
        Download the log lines, that you may filter by time

        :param fr: datetime. Log lines from
        :param to: datetime. Log lines to
        :return: A list with dicts
        """
        arguments = {
            'cluster': self.cluster
        }

        if fr:
            arguments['fr'] = fr

        if to:
            arguments['to'] = to

        client = HTTPClient()
        response = client.fetch('{}/logs?{}'.format(
            self.uri, parse.urlencode(arguments)),
        )

        return json.loads(response.body.decode('utf-8'))
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def unicode_urlencode(query, doseq=True):
    """
    Custom wrapper around urlencode to support unicode

    Python urlencode doesn't handle unicode well so we need to convert to
    bytestrings before using it:
    http://stackoverflow.com/questions/6480723/urllib-urlencode-doesnt-like-unicode-values-how-about-this-workaround
    """
    pairs = []
    for key, value in query.items():
        if isinstance(value, list):
            value = list(map(to_utf8, value))
        else:
            value = to_utf8(value)
        pairs.append((to_utf8(key), value))
    encoded_query = dict(pairs)
    return urlencode(encoded_query, doseq)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def __init__(self, credentials, host, request_uri, headers, response, content, http):
        from urllib.parse import urlencode
        Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
        challenge = _parse_www_authenticate(response, 'www-authenticate')
        service = challenge['googlelogin'].get('service', 'xapi')
        # Bloggger actually returns the service in the challenge
        # For the rest we guess based on the URI
        if service == 'xapi' and  request_uri.find("calendar") > 0:
            service = "cl"
        # No point in guessing Base or Spreadsheet
        #elif request_uri.find("spreadsheets") > 0:
        #    service = "wise"

        auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
        resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
        lines = content.split('\n')
        d = dict([tuple(line.split("=", 1)) for line in lines if line])
        if resp.status == 403:
            self.Auth = ""
        else:
            self.Auth = d['Auth']
项目:alexa-stackoverflow    作者:benvand    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def test_auth_url(self):
        perms = ['email', 'birthday']
        redirect_url = 'https://localhost/facebook/callback/'

        expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode(
            dict(client_id=self.app_id,
                 redirect_uri=redirect_url,
                 scope=','.join(perms)))
        actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms)

        # Since the order of the query string parameters might be
        # different in each URL, we cannot just compare them to each
        # other.
        expected_url_result = urlparse(expected_url)
        actual_url_result = urlparse(actual_url)
        expected_query = parse_qs(expected_url_result.query)
        actual_query = parse_qs(actual_url_result.query)

        self.assertEqual(actual_url_result.scheme, expected_url_result.scheme)
        self.assertEqual(actual_url_result.netloc, expected_url_result.netloc)
        self.assertEqual(actual_url_result.path, expected_url_result.path)
        self.assertEqual(actual_url_result.params, expected_url_result.params)
        self.assertEqual(actual_query, expected_query)
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
项目:danmu-bilibili    作者:saberxxy    | 项目源码 | 文件源码
def _getpage(self, page):
        data = {
            'mid': str(self._mid),
            'pagesize': '30',
            'tid': '0',
            'page': str(page),
            'keyword': '',
            'order': 'senddate',
            '_': '1496812411295'
        }
        # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779
        # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295
        url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data)
        try:
            response = requests.get(url)
            if response.status_code != 200:
                return None
            html_cont = response.text
            return html_cont
        except RequestException:
            return None
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def _tpl_send_sms(phone, tpl_id, tpl_value):
    """
    ???????
    tpl_value = {'#code#':'1234','#company#':'???'}
    """
    if not isValidPhone(phone):
        return False
    if isTestPhone(phone):
        return False
    apikey = settings.YUNPIAN_API_KEY # get apikey by global settings
    params = {'apikey': apikey, 'tpl_id': tpl_id, 'tpl_value': urlencode(tpl_value), 'mobile': phone}
    url = "https://sms.yunpian.com/v1/sms/tpl_send.json"
    headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}
    response = requests.post(url, headers=headers, data=params)
    if response.status_code != 200:
        _logger.error('cannot reach sms server, http_status is %s' % (response.status_code))
        raise SendSMSError("sms server status code is {status_code}".format(status_code=response.status_code))
    else:
        content = response.content.decode()
        data = json.loads(content)
        if data["code"] != 0:
            _logger.error('sms server response error, CODE: %s, MSG: %s(%s)' % (data.get('code'), data.get('msg'), data.get('detail')))
            raise SendSMSError("sms server error. {error_msg}".format(error_msg=content))
    return response
项目:whatsapp-rest-webservice    作者:svub    | 项目源码 | 文件源码
def sendRequest(host, port, path, headers, params, reqType="GET"):

        params = urlencode(params)


        path = path + "?"+ params if reqType == "GET" and params else path

        if len(headers):
            logger.debug(headers)
        if len(params):
            logger.debug(params)

        logger.debug("Opening connection to %s" % host);
        conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port)

        logger.debug("Sending %s request to %s" % (reqType, path))
        conn.request(reqType, path, params, headers);

        response = conn.getresponse()
        return response
项目:flask_atlassian_connect    作者:halkeye    | 项目源码 | 文件源码
def _atlassian_jwt_post_token(self):
        if not getattr(g, 'ac_client', None):
            return dict()

        args = request.args.copy()
        try:
            del args['jwt']
        except KeyError:
            pass

        signature = encode_token(
            'POST',
            request.path + '?' + urlencode(args),
            g.ac_client.clientKey,
            g.ac_client.sharedSecret)
        args['jwt'] = signature
        return dict(atlassian_jwt_post_url=request.path + '?' + urlencode(args))
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def create_new_paste(contents):
    """
    Creates a new paste using bpaste.net service.
    :contents: paste contents as utf-8 encoded bytes
    :returns: url to the pasted contents
    """

    params = {
        'code': contents,
        'lexer': 'python3' if sys.version_info[0] == 3 else 'python',
        'expiry': '1week',
    }
    url = 'https://bpaste.net'
    response = urlopen(url, data=urlencode(params).encode('ascii')).read()
    m = re.search(r'href="/raw/(\w+)"', response.decode('utf-8'))
    if m:
        return '%s/show/%s' % (url, m.group(1))
    else:
        return 'bad response: ' + response
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def request_encode_url(self, method, url, fields=None, headers=None,
                           **urlopen_kw):
        """
        Make a request using :meth:`urlopen` with the ``fields`` encoded in
        the url. This is useful for request methods like GET, HEAD, DELETE, etc.
        """
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)