我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.structures.CaseInsensitiveDict()。
def test_data_error_raised(): class Person(Model): name = StringType(required=True, min_length=4) class PersonAPI(HTTPEater): request_cls = Person response_cls = Person url = 'http://example.com/person' api = PersonAPI(name='John') with pytest.raises(DataError): with requests_mock.Mocker() as mock: mock.get( api.url, json={'name': 'Joh'}, headers=CaseInsensitiveDict({ 'Content-Type': 'application/json' }) ) api()
def test_non_json_content_response(): class GetPersonAPI(HTTPEater): request_cls = Model response_cls = Model url = 'http://example.com/' api = GetPersonAPI() with requests_mock.Mocker() as mock: mock.get( 'http://example.com/', text='Hello world', headers=CaseInsensitiveDict({ 'Content-Type': 'text/plain' }) ) with pytest.raises(NotImplementedError): api()
def __init__(self, url, method='get', data=None, params=None, headers=None, content_type='application/json', app_name=''): self.url = url self.method = method self.params = params or {} self.result = None if not isinstance(headers, dict): headers = {} self.headers = CaseInsensitiveDict(headers) self.headers['Content-Type'] = content_type if data is None: data = {} self.data = json.dumps(data) if 'User-Agent' not in self.headers: if app_name: self.headers['User-Agent'] = _USER_AGENT + '/' + app_name else: self.headers['User-Agent'] = _USER_AGENT
def test_guess_mime_type(): # Can guess from URL extension assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.jpg'})) assert 'image/png' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.png'})) assert 'application/pdf' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.pdf'})) # Can guess from content type if extension is missing assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({ 'Location': 'http://cdn.rets.com/1', 'Content-Type': 'image/jpeg', })) # Can guess from content type assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg'})) assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg;charset=US-ASCII'})) assert None == _guess_mime_type(CaseInsensitiveDict({'Content-Type': ''})) assert None == _guess_mime_type(CaseInsensitiveDict())
def handle_response(cls, response_code, response_data): """ Handles service response. Returns the message, status or workflow id, if applicable """ client_error = response_code >= 400 and response_code < 500 success = response_code >= 200 and response_code < 300 workflow = CaseInsensitiveDict({"Message":"", "Id": 0, "Status": 0}) if client_error or success: try: response_content = json.loads(response_data) workflow.update(response_content) except ValueError: workflow["Message"] = "Invalid response content" else: workflow["Message"] = "{0}: Internal server error".format(response_code) return workflow
def __init__(self, response): super(SanitizedResponse, self).__init__() self.status_code = response.status_code self.encoding = response.encoding self.raw = response.raw self.reason = response.reason self.url = response.url self.request = response.request self.connection = response.connection self._content_consumed = True self._content = "" self.cookies = cookiejar_from_dict({}) self.headers = CaseInsensitiveDict() self.headers['content-length'] = '0' for header in ('date', 'server'): if header in response.headers: self.headers[header] = response.headers[header]
def __init__(self, flow_expiration=60 * 15, flow_cache_size=1000): """ Create a new access manager for managing authorizations. Parameters ---------- flow_expiration: int The number of seconds to keep pending authorization flows in memory flow_cache_size: int The maximum number of pending authorization flows to keep in memory at a time """ self.store = None self.endpoints = CaseInsensitiveDict() self.state_cache = TTLCache(maxsize=flow_cache_size, ttl=flow_expiration) # type: Dict[Tuple[str, str], Tuple[Endpoint, str, str]] self.random = random.SystemRandom() self.redirect_uri = "http://localhost/oauth2/callback/"
def __init__(self, method, url, data=None, params=None, headers=None, app_name=''): self.method = method self.url = url self.data = _convert_request_body(data) self.params = params or {} if not isinstance(headers, CaseInsensitiveDict): self.headers = CaseInsensitiveDict(headers) else: self.headers = headers # tell requests not to add 'Accept-Encoding: gzip, deflate' by default if 'Accept-Encoding' not in self.headers: self.headers['Accept-Encoding'] = None if 'User-Agent' not in self.headers: if app_name: self.headers['User-Agent'] = _USER_AGENT + '/' + app_name else: self.headers['User-Agent'] = _USER_AGENT
def __deserialize_requests_response(self, to_deserialize): """ Create and return a Response based on the contents of the given dictionary. :param to_deserialize: The dictionary to create the Response from. :return: A Response created by the contents of to_deserialize. """ to_return = Response() to_return.status_code = to_deserialize["status_code"] to_return.headers = CaseInsensitiveDict(to_deserialize["headers"]) to_return.encoding = to_deserialize["encoding"] to_return._content = to_deserialize["content"] to_return._content_consumed = True to_return.reason = to_deserialize["reason"] to_return.url = to_deserialize["url"] to_return.request = self.decode(to_deserialize["request"]) return to_return
def prepare_response(self, cached): """Verify our vary headers match and construct a real urllib3 HTTPResponse object. """ # Special case the '*' Vary value as it means we cannot actually # determine if the cached response is suitable for this request. if "*" in cached.get("vary", {}): return body_raw = cached["response"].pop("body") headers = CaseInsensitiveDict(data=cached['response']['headers']) if headers.get('transfer-encoding', '') == 'chunked': headers.pop('transfer-encoding') cached['response']['headers'] = headers try: body = io.BytesIO(body_raw) except TypeError: # This can happen if cachecontrol serialized to v1 format (pickle) # using Python 2. A Python 2 str(byte string) will be unpickled as # a Python 3 str (unicode string), which will cause the above to # fail with: # # TypeError: 'str' does not support the buffer interface body = io.BytesIO(body_raw.encode('utf8')) return HTTPResponse( body=body, preload_content=False, **cached["response"] )
def prepare_request(self, request): cookies = request.cookies or {} # Bootstrap CookieJar. if not isinstance(cookies, cookielib.CookieJar): cookies = cookiejar_from_dict(cookies) # Merge with session cookies merged_cookies = merge_cookies( merge_cookies(RequestsCookieJar(), self.cookies), cookies) # Set environment's basic authentication if not explicitly set. # auth = request.auth # if self.trust_env and not auth and not self.auth: # auth = get_netrc_auth(request.url) p = PreparedRequest() p.prepare( method=request.method.upper(), url=request.url, files=request.files, data=request.data, json=request.json, headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict), params=merge_setting(request.params, self.params), auth=merge_setting(request.auth, self.auth), cookies=merged_cookies, hooks=merge_hooks(request.hooks, self.hooks), ) return p
def default_headers(): """ :rtype: requests.structures.CaseInsensitiveDict """ return CaseInsensitiveDict({ 'User-Agent': default_user_agent(), 'Accept-Encoding': ', '.join(('gzip', 'deflate')), 'Accept': '*/*', # 'Connection': 'keep-alive', 'Connection': 'close', })
def _parse_items(self): """Parse `args.items` into `args.headers`, `args.data`, `args.params`, and `args.files`. """ self.args.headers = CaseInsensitiveDict() self.args.data = ParamDict() if self.args.form else OrderedDict() self.args.files = OrderedDict() self.args.params = ParamDict() try: parse_items(items=self.args.items, headers=self.args.headers, data=self.args.data, files=self.args.files, params=self.args.params) except ParseError as e: if self.args.traceback: raise self.error(e.message) if self.args.files and not self.args.form: # `http url @/path/to/file` file_fields = list(self.args.files.keys()) if file_fields != ['']: self.error( 'Invalid file fields (perhaps you meant --form?): %s' % ','.join(file_fields)) fn, fd = self.args.files[''] self.args.files = {} self._body_from_file(fd) if 'Content-Type' not in self.args.headers: mime, encoding = mimetypes.guess_type(fn, strict=False) if mime: content_type = mime if encoding: content_type = '%s; charset=%s' % (mime, encoding) self.args.headers['Content-Type'] = content_type
def __init__(self, data=None, tfs=None): super().__init__(data, tfs) self._fields = CaseInsensitiveDict(self._data['fields']) self._system_prefix = 'System.' self.id = self._data['id']
def __init__(self, content, encoding): self.encoding = encoding headers = {} # Split into header section (if any) and the content if b'\r\n\r\n' in content: first, self.content = _split_on_find(content, b'\r\n\r\n') if first != b'': headers = _header_parser(first.lstrip(), encoding) else: raise ImproperBodyPartContentException( 'content does not contain CR-LF-CR-LF' ) self.headers = CaseInsensitiveDict(headers)
def __init__(self, status_code=None, url=None, orig_url=None, headers=CaseInsensitiveDict(), content='', cookies=None, error=None, traceback=None, save=None, js_script_result=None, time=0): if cookies is None: cookies = {} self.status_code = status_code self.url = url self.orig_url = orig_url self.headers = headers self.content = content self.cookies = cookies self.error = error self.traceback = traceback self.save = save self.js_script_result = js_script_result self.time = time
def rebuild_response(r): response = Response( status_code=r.get('status_code', 599), url=r.get('url', ''), headers=CaseInsensitiveDict(r.get('headers', {})), content=r.get('content', ''), cookies=r.get('cookies', {}), error=r.get('error'), traceback=r.get('traceback'), time=r.get('time', 0), orig_url=r.get('orig_url', r.get('url', '')), js_script_result=r.get('js_script_result'), save=r.get('save'), ) return response
def test_docstring_example(self): cid = CaseInsensitiveDict() cid['Accept'] = 'application/json' assert cid['aCCEPT'] == 'application/json' assert list(cid) == ['Accept']
def test_len(self): cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) cid['A'] = 'a' assert len(cid) == 2
def test_getitem(self): cid = CaseInsensitiveDict({'Spam': 'blueval'}) assert cid['spam'] == 'blueval' assert cid['SPAM'] == 'blueval'
def test_fixes_649(self): """__setitem__ should behave case-insensitively.""" cid = CaseInsensitiveDict() cid['spam'] = 'oneval' cid['Spam'] = 'twoval' cid['sPAM'] = 'redval' cid['SPAM'] = 'blueval' assert cid['spam'] == 'blueval' assert cid['SPAM'] == 'blueval' assert list(cid.keys()) == ['SPAM']
def test_delitem(self): cid = CaseInsensitiveDict() cid['Spam'] = 'someval' del cid['sPam'] assert 'spam' not in cid assert len(cid) == 0
def test_get(self): cid = CaseInsensitiveDict() cid['spam'] = 'oneval' cid['SPAM'] = 'blueval' assert cid.get('spam') == 'blueval' assert cid.get('SPAM') == 'blueval' assert cid.get('sPam') == 'blueval' assert cid.get('notspam', 'default') == 'default'
def test_update(self): cid = CaseInsensitiveDict() cid['spam'] = 'blueval' cid.update({'sPam': 'notblueval'}) assert cid['spam'] == 'notblueval' cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}) cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) assert len(cid) == 2 assert cid['foo'] == 'anotherfoo' assert cid['bar'] == 'anotherbar'
def test_update_retains_unchanged(self): cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) cid.update({'foo': 'newfoo'}) assert cid['bar'] == 'bar'
def test_iter(self): cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) keys = frozenset(['Spam', 'Eggs']) assert frozenset(iter(cid)) == keys
def test_equality(self): cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) assert cid == othercid del othercid['spam'] assert cid != othercid assert cid == {'spam': 'blueval', 'eggs': 'redval'} assert cid != object()
def test_lower_items(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) lowerkeyset = frozenset(['accept', 'user-agent']) assert keyset == lowerkeyset
def test_preserve_key_case(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) keyset = frozenset(['Accept', 'user-Agent']) assert frozenset(i[0] for i in cid.items()) == keyset assert frozenset(cid.keys()) == keyset assert frozenset(cid) == keyset
def test_preserve_last_key_case(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) cid.update({'ACCEPT': 'application/json'}) cid['USER-AGENT'] = 'requests' keyset = frozenset(['ACCEPT', 'USER-AGENT']) assert frozenset(i[0] for i in cid.items()) == keyset assert frozenset(cid.keys()) == keyset assert frozenset(cid) == keyset
def test_copy(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) cid_copy = cid.copy() assert cid == cid_copy cid['changed'] = True assert cid != cid_copy
def build_response(self): request = self.calls[-1].args[0] r = requests.Response() try: r.status_code = int(self.redirects.pop(0)) except IndexError: r.status_code = 200 r.headers = CaseInsensitiveDict({'Location': '/'}) r.raw = self._build_raw() r.request = request return r
def parse_http_equiv_headers(html: str) -> CaseInsensitiveDict: http_equiv_headers = CaseInsensitiveDict() # Try to parse the HTML try: soup = bs(html, 'html.parser') except: return http_equiv_headers # Find all the meta tags metas = soup.find_all('meta') for meta in metas: if meta.has_attr('http-equiv') and meta.has_attr('content'): # Add support for multiple CSP policies specified via http-equiv # See issue: https://github.com/mozilla/http-observatory/issues/266 # Note that this is so far only done for CSP and not for other types # of http-equiv if (meta.get('http-equiv', '').lower().strip() == 'content-security-policy' and 'Content-Security-Policy' in http_equiv_headers): http_equiv_headers['Content-Security-Policy'] += '; ' + meta.get('content') else: http_equiv_headers[meta.get('http-equiv')] = meta.get('content') # Technically not HTTP Equiv, but I'm treating it that way elif meta.get('name', '').lower().strip() == 'referrer' and meta.has_attr('content'): http_equiv_headers['Referrer-Policy'] = meta.get('content') return http_equiv_headers
def __init__(self, consumer_key=None, consumer_secret=None, params=None, launch_url=None, launch_headers=None): super(ToolProvider, self).__init__(consumer_key, consumer_secret, params=params) self.outcome_requests = [] self._last_outcome_request = None self.launch_url = launch_url self.launch_headers = launch_headers or CaseInsensitiveDict() if 'Content-Type' not in self.launch_headers: self.launch_headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED
def __init__( self, headers, data=None, status_code=None, oauth_token=None, response=None): self.headers = CaseInsensitiveDict(headers) self.data = data self.status_code = status_code self.oauth_token = oauth_token self.response = response self.cookies = OrderedDict(response.cookies.items()) self.logger = get_logger(__name__)
def make_headers(self, headers=None, cookies=None): headers = CaseInsensitiveDict( isinstance(headers, dict) and headers or {}) if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' if self.oauth_token: headers['Authorization'] = 'oauth2: {}'.format(self.oauth_token) return dict(headers)
def format_event(event, context): output_context = { 'aws_request_id': context.aws_request_id, 'client_context': context.client_context, 'function_name': context.function_name, 'function_version': context.function_version, 'get_remaining_time_in_millis': context.get_remaining_time_in_millis(), 'invoked_function_arn': context.invoked_function_arn, 'log_group_name': context.log_group_name, 'log_stream_name': context.log_stream_name, 'memory_limit_in_mb': context.memory_limit_in_mb, } hashed_event = Hasher(event) request = { 'requested-at': datetime.datetime.utcnow().isoformat(), 'context': output_context, 'operation': hashed_event['operation'], 'requestor': hashed_event['parameters']['requestor'], 'body': hashed_event['parameters']['request']['body'], 'path': hashed_event['parameters']['request']['path'], 'querystring': hashed_event['parameters']['request']['querystring'], 'header': CaseInsensitiveDict( hashed_event['parameters']['request']['header']), 'gateway': hashed_event['parameters']['gateway'], } return request
def send(self, request, stream=None, timeout=None, verify=None, cert=None, proxies=None): parsed_url = urlparse.urlparse(request.url) # We only work for requests with a host of localhost if parsed_url.netloc.lower() != "localhost": raise InvalidURL("Invalid URL %r: Only localhost is allowed" % request.url) real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:]) pathname = url_to_path(real_url) resp = Response() resp.status_code = 200 resp.url = real_url stats = os.stat(pathname) modified = email.utils.formatdate(stats.st_mtime, usegmt=True) resp.headers = CaseInsensitiveDict({ "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain", "Content-Length": stats.st_size, "Last-Modified": modified, }) resp.raw = LocalFSResponse(open(pathname, "rb")) resp.close = resp.raw.close return resp
def serialize_patch(obj): """Convert objects into JSON structures.""" # Record class and module information for deserialization result = {'__class__': obj.__class__.__name__} try: result['__module__'] = obj.__module__ except AttributeError: pass # Convert objects to dictionary representation based on type if isinstance(obj, datetime.datetime): result['year'] = obj.year result['month'] = obj.month result['day'] = obj.day result['hour'] = obj.hour result['minute'] = obj.minute result['second'] = obj.second result['microsecond'] = obj.microsecond return result if isinstance(obj, StreamingBody): original_text = obj.read() # We remove a BOM here if it exists so that it doesn't get reencoded # later on into a UTF-16 string, presumably by the json library result['body'] = original_text.decode('utf-8-sig') obj._raw_stream = BytesIO(original_text) obj._amount_read = 0 return result if isinstance(obj, CaseInsensitiveDict): result['as_dict'] = dict(obj) return result raise TypeError('Type not serializable')
def make_response(status_code: int = 200, content: bytes = b'', headers: dict = None, reason: str = None, encoding: str = None, ) -> Response: response = Response() response.status_code = status_code response._content = content response._content_consumed = True response.headers = CaseInsensitiveDict(headers or {}) response.encoding = encoding or get_encoding_from_headers(headers or {}) response.reason = reason return response
def _guess_mime_type(headers: CaseInsensitiveDict) -> Optional[str]: location = headers.get('location') if location: mime_type, _ = mimetypes.guess_type(location) if mime_type: return mime_type # Parse mime type from content-type header, e.g. 'image/jpeg;charset=US-ASCII' -> 'image/jpeg' mime_type, _ = cgi.parse_header(headers.get('content-type', '')) return mime_type or None
def _decode_headers(headers: CaseInsensitiveDict, encoding: str) -> CaseInsensitiveDict: return CaseInsensitiveDict({ k.decode(encoding): v.decode(encoding) for k, v in headers.items() })
def _pod_response_fixture(headers=None): mock_response = mock.create_autospec(requests.Response) headers = CaseInsensitiveDict({} if headers is None else headers) mock_response.headers = headers return mock_response
def __init__(self): self.status_code = None self.url = None self.orig_url = None self.headers = CaseInsensitiveDict() self.content = '' self.cookies = {} self.error = None self.save = None self.js_script_result = None self.time = 0
def rebuild_response(r): response = Response() response.status_code = r.get('status_code', 599) response.url = r.get('url', '') response.orig_url = r.get('orig_url', response.url) response.headers = CaseInsensitiveDict(r.get('headers', {})) response.content = r.get('content', '') response.cookies = r.get('cookies', {}) response.error = r.get('error') response.time_cost = r.get('time_cost', 0) response.js_script_result = r.get('js_script_result') return response