我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用httplib2.DEFAULT_MAX_REDIRECTS。
def request(self, uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): scheme = urlparse.urlparse(uri).scheme request = (uri, method, body, headers) if scheme == 'http': self.throttle_handler.http_request(request) elif scheme == 'https': self.throttle_handler.https_request(request) response = super(_ThrottledHttp, self).request( uri, method, body, headers, redirections, connection_type) if scheme == 'http': self.throttle_handler.http_response(request, response) elif scheme == 'https': self.throttle_handler.https_response(request, response) return response
def set_user_agent(http, user_agent): """Set the user-agent on every request. Args: http - An instance of httplib2.Http or something that acts like it. user_agent: string, the value for the user-agent header. Returns: A modified instance of http that was passed in. Example: h = httplib2.Http() h = set_user_agent(h, "my-app-name/6.0") Most of the time the user-agent will be set doing auth, this is for the rare cases where you are accessing an unauthenticated endpoint. """ request_orig = http.request # The closure that will replace 'httplib2.Http.request'. def new_request(uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): """Modify the request headers to add the user-agent.""" if headers is None: headers = {} if 'user-agent' in headers: headers['user-agent'] = user_agent + ' ' + headers['user-agent'] else: headers['user-agent'] = user_agent resp, content = request_orig(uri, method, body, headers, redirections, connection_type) return resp, content http.request = new_request return http
def tunnel_patch(http): """Tunnel PATCH requests over POST. Args: http - An instance of httplib2.Http or something that acts like it. Returns: A modified instance of http that was passed in. Example: h = httplib2.Http() h = tunnel_patch(h, "my-app-name/6.0") Useful if you are running on a platform that doesn't support PATCH. Apply this last if you are using OAuth 1.0, as changing the method will result in a different signature. """ request_orig = http.request # The closure that will replace 'httplib2.Http.request'. def new_request(uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): """Modify the request headers to add the user-agent.""" if headers is None: headers = {} if method == 'PATCH': if 'oauth_token' in headers.get('authorization', ''): logging.warning( 'OAuth 1.0 request made with Credentials after tunnel_patch.') headers['x-http-method-override'] = "PATCH" method = 'POST' resp, content = request_orig(uri, method, body, headers, redirections, connection_type) return resp, content http.request = new_request return http
def request(http, uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): """Make an HTTP request with an HTTP object and arguments. Args: http: httplib2.Http, an http object to be used to make requests. uri: string, The URI to be requested. method: string, The HTTP method to use for the request. Defaults to 'GET'. body: string, The payload / body in HTTP request. By default there is no payload. headers: dict, Key-value pairs of request headers. By default there are no headers. redirections: int, The number of allowed 203 redirects for the request. Defaults to 5. connection_type: httplib.HTTPConnection, a subclass to be used for establishing connection. If not set, the type will be determined from the ``uri``. Returns: tuple, a pair of a httplib2.Response with the status code and other headers and the bytes of the content returned. """ # NOTE: Allowing http or http.request is temporary (See Issue 601). http_callable = getattr(http, 'request', http) return http_callable(uri, method=method, body=body, headers=headers, redirections=redirections, connection_type=connection_type)
def tunnel_patch(http): """Tunnel PATCH requests over POST. Args: http - An instance of httplib2.Http or something that acts like it. Returns: A modified instance of http that was passed in. Example: h = httplib2.Http() h = tunnel_patch(h, "my-app-name/6.0") Useful if you are running on a platform that doesn't support PATCH. Apply this last if you are using OAuth 1.0, as changing the method will result in a different signature. """ request_orig = http.request # The closure that will replace 'httplib2.Http.request'. def new_request(uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): """Modify the request headers to add the user-agent.""" if headers is None: headers = {} if method == 'PATCH': if 'oauth_token' in headers.get('authorization', ''): LOGGER.warning( 'OAuth 1.0 request made with Credentials after tunnel_patch.') headers['x-http-method-override'] = "PATCH" method = 'POST' resp, content = request_orig(uri, method, body, headers, redirections, connection_type) return resp, content http.request = new_request return http
def request(self, uri, method="GET", body=b'', headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): DEFAULT_POST_CONTENT_TYPE = 'application/x-www-form-urlencoded' if not isinstance(headers, dict): headers = {} if method == "POST": headers['Content-Type'] = headers.get('Content-Type', DEFAULT_POST_CONTENT_TYPE) is_form_encoded = \ headers.get('Content-Type') == 'application/x-www-form-urlencoded' if is_form_encoded and body: parameters = parse_qs(body) else: parameters = None req = Request.from_consumer_and_token(self.consumer, token=self.token, http_method=method, http_url=uri, parameters=parameters, body=body, is_form_encoded=is_form_encoded) req.sign_request(self.method, self.consumer, self.token) scheme, netloc, path, params, query, fragment = urlparse(uri) realm = urlunparse((scheme, netloc, '', None, None, None)) if is_form_encoded: body = req.to_postdata() elif method == "GET": uri = req.to_url() else: headers.update(req.to_header(realm=realm)) return httplib2.Http.request(self, uri, method=method, body=body, headers=headers, redirections=redirections, connection_type=connection_type)
def test_multipart_post_does_not_alter_body(self, mockHttpRequest): random_result = random.randint(1,100) data = { 'rand-%d'%random.randint(1,100):random.randint(1,100), } content_type, body = self.create_simple_multipart_data(data) client = oauth.Client(self.consumer, None) uri = self._uri('two_legged') def mockrequest(cl, ur, **kw): self.assertTrue(cl is client) self.assertTrue(ur is uri) self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) self.assertEqual(kw['body'], body) self.assertEqual(kw['connection_type'], None) self.assertEqual(kw['method'], 'POST') self.assertEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) self.assertTrue(isinstance(kw['headers'], dict)) return random_result mockHttpRequest.side_effect = mockrequest result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body) self.assertEqual(result, random_result)
def test_url_with_query_string(self, mockHttpRequest): uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf' client = oauth.Client(self.consumer, None) random_result = random.randint(1,100) def mockrequest(cl, ur, **kw): self.assertTrue(cl is client) self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) self.assertEqual(kw['body'], b'') self.assertEqual(kw['connection_type'], None) self.assertEqual(kw['method'], 'GET') self.assertEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) self.assertTrue(isinstance(kw['headers'], dict)) req = oauth.Request.from_consumer_and_token(self.consumer, None, http_method='GET', http_url=uri, parameters={}) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None) expected = parse_qsl( urlparse(req.to_url()).query) actual = parse_qsl(urlparse(ur).query) self.assertEqual(len(expected), len(actual)) actual = dict(actual) for key, value in expected: if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'): self.assertEqual(actual[key], value) return random_result mockHttpRequest.side_effect = mockrequest client.request(uri, 'GET')
def wrap_http_for_jwt_access(credentials, http): """Prepares an HTTP object's request method for JWT access. Wraps HTTP requests with logic to catch auth failures (typically identified via a 401 status code). In the event of failure, tries to refresh the token used and then retry the original request. Args: credentials: _JWTAccessCredentials, the credentials used to identify a service account that uses JWT access tokens. http: httplib2.Http, an http object to be used to make auth requests. """ orig_request_method = http.request wrap_http_for_auth(credentials, http) # The new value of ``http.request`` set by ``wrap_http_for_auth``. authenticated_request_method = http.request # The closure that will replace 'httplib2.Http.request'. def new_request(uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): if 'aud' in credentials._kwargs: # Preemptively refresh token, this is not done for OAuth2 if (credentials.access_token is None or credentials.access_token_expired): credentials.refresh(None) return request(authenticated_request_method, uri, method, body, headers, redirections, connection_type) else: # If we don't have an 'aud' (audience) claim, # create a 1-time token with the uri root as the audience headers = _initialize_headers(headers) _apply_user_agent(headers, credentials.user_agent) uri_root = uri.split('?', 1)[0] token, unused_expiry = credentials._create_token({'aud': uri_root}) headers['Authorization'] = 'Bearer ' + token return request(orig_request_method, uri, method, body, clean_headers(headers), redirections, connection_type) # Replace the request method with our own closure. http.request = new_request # Set credentials as a property of the request method. http.request.credentials = credentials
def request(self, uri, method="GET", body='', headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): DEFAULT_POST_CONTENT_TYPE = 'application/x-www-form-urlencoded' if not isinstance(headers, dict): headers = {} if method == "POST": headers['Content-Type'] = headers.get('Content-Type', DEFAULT_POST_CONTENT_TYPE) is_form_encoded = \ headers.get('Content-Type') == 'application/x-www-form-urlencoded' if is_form_encoded and body: parameters = parse_qs(body) else: parameters = None req = Request.from_consumer_and_token(self.consumer, token=self.token, http_method=method, http_url=uri, parameters=parameters, body=body, is_form_encoded=is_form_encoded) req.sign_request(self.method, self.consumer, self.token) schema, rest = urllib.splittype(uri) if rest.startswith('//'): hierpart = '//' else: hierpart = '' host, rest = urllib.splithost(rest) realm = schema + ':' + hierpart + host if is_form_encoded: body = req.to_postdata() elif method == "GET": uri = req.to_url() else: headers.update(req.to_header(realm=realm)) return httplib2.Http.request(self, uri, method=method, body=body, headers=headers, redirections=redirections, connection_type=connection_type)
def authorize(self, http): """Authorize an httplib2.Http instance with a JWT assertion. Unless specified, the 'aud' of the assertion will be the base uri of the request. Args: http: An instance of ``httplib2.Http`` or something that acts like it. Returns: A modified instance of http that was passed in. Example:: h = httplib2.Http() h = credentials.authorize(h) """ request_orig = http.request request_auth = super(_JWTAccessCredentials, self).authorize(http).request # The closure that will replace 'httplib2.Http.request'. def new_request(uri, method='GET', body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None): if 'aud' in self._kwargs: # Preemptively refresh token, this is not done for OAuth2 if self.access_token is None or self.access_token_expired: self.refresh(None) return request_auth(uri, method, body, headers, redirections, connection_type) else: # If we don't have an 'aud' (audience) claim, # create a 1-time token with the uri root as the audience headers = _initialize_headers(headers) _apply_user_agent(headers, self.user_agent) uri_root = uri.split('?', 1)[0] token, unused_expiry = self._create_token({'aud': uri_root}) headers['Authorization'] = 'Bearer ' + token return request_orig(uri, method, body, clean_headers(headers), redirections, connection_type) # Replace the request method with our own closure. http.request = new_request return http