我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.httpclient.HTTPRequest()。
def search_machine(self, callback=None): uri = self._base_url + self.version_prefix + '/machines' req = HTTPRequest(uri, self._MGET, request_timeout=self.read_timeout, follow_redirects=self.allow_redirect, ) response_future = self.http.fetch(req, callback=lambda result: result) def _callback(fut): exc = fut.exc_info() if exc: if not isinstance(exc[1], etcdexcept.EtcdException): # We can't get the list of machines, if one server is in the # machines cache, try on it _log.error("Failed to get list of machines from %s%s: %r and retry it.", uri, self.version_prefix, exc) if self._machines_cache: self._base_url = self._machines_cache.pop(0) _log.debug("Retrying on %s", self._base_url) # Call myself self.ioloop.add_future(self.search_machine(), _callback) return else: raise etcdexcept.EtcdException("Could not get the list of servers, " "maybe you provided the wrong " "host(s) to connect to?") else: response = fut.result() machines = [ node.strip() for node in self._handle_server_response(response).body.decode('utf-8').split(',') ] _log.debug("Retrieved list of machines: %s", machines) self._machines_cache = machines if self._base_url not in self._machines_cache: self._base_url = self._choice_machine() callback(fut.result()) self.ioloop.add_future(response_future, _callback)
def api_execute(self, path, method, params=None, timeout=None): """ Executes the query. """ url = self._base_url + path validate_cert = True if self.cert_options else False if (method == self._MGET) or (method == self._MDELETE): if params: url = url_concat(url, params) body = None elif (method == self._MPUT) or (method == self._MPOST): body = urlencode(params) else: raise etcdexcept.EtcdException( 'HTTP method {} not supported'.format(method)) request = HTTPRequest(url, method=method, request_timeout=timeout, headers=self._get_default_headers(method), follow_redirects=self.allow_redirect, body=body, validate_cert=validate_cert, ca_certs=self.cert_options.get('ca_certs', None), client_key=self.cert_options.get('client_key', None), client_cert=self.cert_options.get('client_cert', None), auth_username=self.username, auth_password=self.password) _log.debug("Request %s %s %s" % (path, method, request.body)) return self.http.fetch(request)
def api_execute_json(self, path, method, params=None, timeout=None): url = self._base_url + path json_payload = json.dumps(params) headers = self._get_default_headers(method) headers['Content-Type'] = 'application/json' validate_cert = True if self.cert_options else False request = HTTPRequest(url, method=method, request_timeout=timeout, headers=headers, body=json_payload, follow_redirects=self.allow_redirect, validate_cert=validate_cert, ca_certs=self.cert_options.get('ca_certs', None), client_key=self.cert_options.get('client_key', None), client_cert=self.cert_options.get('client_cert', None), auth_username=self.username, auth_password=self.password) return self.http.fetch(request)
def request(method,url,params=None,data=None,context=None): http_client = httpclient.HTTPClient() if params is not None and len(params.keys())>0: url+='?' for key in params.keys(): url+="%s=%s&"%(key,params[key]) url=url[:-1] if context is not None: url = url.replace('http://','https://',1) try: request = httpclient.HTTPRequest(url=url, method =method, ssl_options=context, body=data) response = http_client.fetch(request) http_client.close() except httpclient.HTTPError as e: if e.response is None: return tornado_response(500,str(e)) return tornado_response(e.response.code,e.response.body) if response is None: return None return tornado_response(response.code,response.body)
def __init__(self, method=None, url=None, headers=None, files=None, data=None, params=None, auth=None, cookies=None, hooks=None, json=None, request=None): if isinstance(request, HTTPRequest): request = self._transform_tornado_request(request) elif not isinstance(request, _Request): request = None _Request.__init__(self, method = method or getattr(request, 'method', None), url = url or getattr(request, 'url', None), headers = headers or getattr(request, 'headers', None), files = files or getattr(request, 'files', None), data = data or getattr(request, 'data', None), params = params or getattr(request, 'params', None), auth = auth or getattr(request, 'auth', None), cookies = cookies or getattr(request, 'cookies', None), hooks = hooks or getattr(request, 'hooks', None), json = json or getattr(request, 'json', None))
def _transform_tornado_request(self, request): if isinstance(request, HTTPRequest): raise ValueError('param tRequest should be \ HTTPRequest instance from tornado package.') host, port = get_host_and_port(request.url) if re.match(r'^\[.*\]$', host): # raw ipv6 addresses in urls are enclosed in brackets host = host[1:-1] self.parsed_hostname = host # save final host for _on_connect if request.allow_ipv6 is False: af = AF_INET else: af = AF_UNSPEC ssl_options = self._get_ssl_options(self.parsed.scheme) timeout = min(self.request.connect_timeout, self.request.request_timeout) if timeout: self._timeout = self.io_loop.add_timeout( self.start_time + timeout, stack_context.wrap(functools.partial(self._on_timeout, "while connecting")))
def ping(self, path=None): request = HTTPRequest( url=self.endpoint + (path or self.api_path), method='GET', headers=self.headers, follow_redirects=False, request_timeout=100 ) try: yield self.outbound_client.fetch(request) except HTTPError as ex: # pragma: no cover if ex.code == 307: raise_from(MasterRedirect( urlparse(ex.response.headers['location']).netloc), None) except ConnectionRefusedError as ex: # pragma: no cover log.debug("Problem reaching: %s" % self.endpoint) raise ex except Exception as ex: # pragma: no cover log.debug("Unhandled exception when connecting to %s", self.endpoint) raise ex
def _to_http_request(url, request): http_headers = headers.to_http_headers(request.headers or {}) http_headers.update({ headers.CALLER: request.caller or '', headers.SERVICE: request.service or '', headers.ENCODING: request.encoding or '', headers.TTL: '10000', # TODO headers.PROCEDURE: request.procedure or '', }) http_request = HTTPRequest( url=url, method='POST', headers=http_headers, body=request.body or '', # request_timeout=None, TODO ) return http_request
def test_inbound_headers(http_headers, rpc_headers, http_resp_headers): class Handler(object): def handle(self, request): assert rpc_headers == request.headers return Response(headers=rpc_headers, body=request.body) inbound = HTTPInbound() inbound.start(Handler()) client = AsyncHTTPClient() req = HTTPRequest( url='http://localhost:%s' % inbound.port, method='POST', headers=http_headers, body='', ) res = yield client.fetch(req) assert 200 == res.code for k, v in http_resp_headers.items(): assert v == res.headers[k]
def fetch_definitions(self, term): url = url_concat(UrbanDictionary.DEFINE_URL, dict(term=term)) request = HTTPRequest( url = url, headers = { 'X-Mashape-Key': self.api_key, 'Accept' : 'text/plain' } ) tornado_future = self.client.fetch(request) future = to_asyncio_future(tornado_future) response = await future data = json_decode(response.body) return data['list']
def access_token(self, code, state): client = AsyncHTTPClient() payload = ( ('client_id', self.client_id), ('client_secret', self.client_secret), ('grant_type', 'authorization_code'), ('redirect_uri', Twitch.REDIRECT_URI), ('code', code), ('state', state), ) url = Twitch.TOKEN_URL request = HTTPRequest( url = url, method = 'POST', body = urlencode(payload) ) tornado_future = client.fetch(request) future = to_asyncio_future(tornado_future) response = await future data = json_decode(response.body) return data['access_token']
def _request(self, url_, method_, params=None, data=None, **kwargs): """Perform request with Tornado's async HTTP client and return result wrapped with `asyncio.Future`. """ request = HTTPRequest(utils.merge_url(url_, params), method=method_, body=utils.encode_body(data), **utils.norm_tornado_kwargs(**kwargs)) # Async fetch request future = asyncio.Future(loop=self._asyncio_loop) def on_fetch(resp): future.set_result(resp) try: self._client.fetch(request, on_fetch) except Exception as e: future.set_exception(e) # Wrap result return self._result((yield from future))
def is_running(self): '''Check if our proxied process is still running.''' if 'proc' not in self.state: return False # Check if the process is still around proc = self.state['proc'] if proc.proc.poll() == 0: self.log.info('Cannot poll on process.') return False client = httpclient.AsyncHTTPClient() req = httpclient.HTTPRequest('http://localhost:{}'.format(self.port)) try: yield client.fetch(req) self.log.debug('Got positive response from rstudio server') except: self.log.debug('Got negative response from rstudio server') return False return True
def post_message(msg, endpoint, channel, username='BBTornado', unfurl_links=False, icon=":robot_face:"): """ Post a message on slack. This will "fire-and-forget", so returns nothing. """ client = AsyncHTTPClient() body = dict(icon_emoji=icon, text=msg, username=username, unfurl_links=unfurl_links, channel=channel) req = HTTPRequest(endpoint, method='POST', headers={ 'Content-Type': 'application/json' }, body=json.dumps(body)) IOLoop.current().spawn_callback(client.fetch, req, raise_error=False)
def test_instance_reload(self): content = {"logdir": self.log_dir, "reload_interval": 4} content_type = {"Content-Type": "application/json"} request = HTTPRequest( url=self.get_url('/api/tensorboard'), method='POST', body=json.dumps(content), headers=content_type) response = yield self.http_client.fetch(request) instance = json.loads(response.body.decode()) assert instance is not None name = instance["name"] reload_time = instance["reload_time"] time.sleep(5) request = HTTPRequest( url=self.get_url('/api/tensorboard/{}'.format(name))) response = yield self.http_client.fetch(request) instance2 = json.loads(response.body.decode()) assert instance2["reload_time"] != reload_time
def request_for_batch(host, port, connect_timeout, request_timeout, schema_cache, topic, batch): """Returns a Tornado HTTPRequest to the REST proxy representing the given message batch. This does not send the request, it just creates the object.""" request = HTTPRequest('{0}:{1}/topics/{2}'.format(host, port, topic), connect_timeout=connect_timeout, request_timeout=request_timeout, method='POST', headers={'Accept': 'application/vnd.kafka.v1+json', 'Content-Type': 'application/vnd.kafka.avro.v1+json'}, body=_encode_payload(schema_cache, topic, batch)) # We also stick the message batch on the HTTPRequest object itself # so it is available to us when we handle the response. This is necessary # because individual messages can fail even if the overall request is # successful. request._topic = topic request._batch = batch # We also stick a unique ID on our request so we can keep track of # in-flight requests and the events contained in them request._id = uuid4() return request
def sendRedPackage(self, openid, amount): await self.db.add_log(openid,"try send RedPackage", "retry_Time :" + "none") url = "https://api.mch.weixin.qq.com/mmpaymkttransfers/sendredpack" strr = generateWXParam(openid, amount) request = HTTPRequest(url = url, method = "POST", body = strr, client_key="/home/coco/cert/apiclient_key.pem", ca_certs="/home/coco/cert/rootca.pem", client_cert="/home/coco/cert/apiclient_cert.pem") client = AsyncHTTPClient() try: response = await client.fetch(request) res = parseWeixin(response.body.decode('utf-8')) await self.db.add_log(openid,"send RedPackage response", res) if res['return_code'] == 'SUCCESS' and res['result_code']=='SUCCESS' : config.hasSent[config.turn-1][openid] = amount/100.0 await self.db.add_order(openid,config.turn,amount/100.0,"Sent") else : config.sendPackageResponseError += 1 await self.db.add_order(openid,config.turn,amount/100.0,"NotSent") except Exception as e: await self.db.add_log(openid,"send RedPackage response", "redpackage Callback failed") config.sendPackageError += 1
def async_fetch(url, headers=None, method="GET", data=None, follow_redirects=False): """ Async http fetch :param url: :param headers: :param method: :param data: :param follow_redirects: """ client = HTTPClient() headers = headers or {} body = None if method == "GET" and data is not None: url = url + '?' + urlencode(data) elif method == "POST" and data is not None: headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) body = urlencode(data) request = HTTPRequest(url=url, headers=headers, method=method, follow_redirects=follow_redirects, body=body) response = yield client.fetch(request, raise_error=False) # return response raise gen.Return(response)
def raw_fetch(self, headers, body): with closing(Resolver(io_loop=self.io_loop)) as resolver: with closing(SimpleAsyncHTTPClient(self.io_loop, resolver=resolver)) as client: conn = RawRequestHTTPConnection( self.io_loop, client, httpclient._RequestProxy( httpclient.HTTPRequest(self.get_url("/")), dict(httpclient.HTTPRequest._DEFAULTS)), None, self.stop, 1024 * 1024, resolver) conn.set_request( b"\r\n".join(headers + [utf8("Content-Length: %d\r\n" % len(body))]) + b"\r\n" + body) response = self.wait() response.rethrow() return response
def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None): """Client-side websocket support. Takes a url and returns a Future whose result is a `WebSocketClientConnection`. .. versionchanged:: 3.2 Also accepts ``HTTPRequest`` objects in place of urls. """ if io_loop is None: io_loop = IOLoop.current() if isinstance(url, httpclient.HTTPRequest): assert connect_timeout is None request = url # Copy and convert the headers dict/object (see comments in # AsyncHTTPClient.fetch) request.headers = httputil.HTTPHeaders(request.headers) else: request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout) request = httpclient._RequestProxy( request, httpclient.HTTPRequest._DEFAULTS) conn = WebSocketClientConnection(io_loop, request) if callback is not None: io_loop.add_future(conn.connect_future, callback) return conn.connect_future
def login(): logging.debug("login: Start to login.") client.sessionID = None loginInfo = {} loginInfo["token1"] = hashlib.md5(client.token).hexdigest() loginInfo["token2"] = hashlib.md5(client.boxID).hexdigest() loginInfo["boxid"] = client.boxID headers, body = createFormData(loginInfo) url = client.protocol + client.serverHostPort + "/login" login_request = HTTPRequest(url=url, method="POST", headers=headers, body=body) logging.debug("login: Login info = %s", str(loginInfo)) response = yield client.http_client.fetch(login_request) if response.error: logging.error("login: Failed to login. error=%s", response.error) return else: logging.debug("login: Login result. response.body=%r", response.body) loginRes = json_decode(response.body) if loginRes["ret_value"] == 0: client.sessionID = loginRes["sessionID"] logging.info("login: Succeed to login. sessionID=%s", loginRes["sessionID"]) else: logging.error("login: Failed to login. ret_value=%d", loginRes["ret_value"]) return
def handleOperate(cmd): logging.info("handleOperate: Start to handle operate. cmd=%s", str(cmd)) url = client.protocol + client.localHostPort + "/command" command = {"type": cmd['type'], "boxid": cmd['boxid'], "data": "", } del cmd['type'] command['data'] = json.dumps(cmd) headers, body = createFormData(command) operate_request = HTTPRequest(url=url, method="POST", headers=headers, body=body) response = yield client.http_client.fetch(operate_request, request_timeout=60) if response.error: logging.error("handleOperate: Failed to send operate. error=%s", response.error) else: logging.info("handleOperate: Operate result. response.body=%r", response.body) res = json_decode(response.body) if res["ret_value"] != 0: #need login logging.warn("handleOperate: Failed to execute [%s] operation. ret_val=", res["ret_value"]) logging.info("handleOperate: End to handle operate.")
def handleTest(cmd): logging.info("handleTest: Start to handle test. cmd=%s", str(cmd)) url = client.protocol + client.localHostPort + "/test" test_comand = { 'type': cmd['type'], 'data': "", } del cmd['type'] test_comand['data'] = json.dumps(cmd) headers, body = createFormData(test_comand) operate_request = HTTPRequest(url=url, method="POST", headers=headers, body=body) response = yield client.http_client.fetch(operate_request, request_timeout=60) if response.error: logging.error("handleTest: Failed to send test. error=%s", response.error) else: logging.info("handleTest: Test result. response.body=%r", response.body) res = json_decode(response.body) if res["ret_value"] != 0: #need login logging.warn("handleTest: Failed to execute [%s] test. ret_val=", res["ret_value"]) logging.info("handleTest: End to handle test.")
def handleProfile(cmd): logging.info("handleProfile: Start to handle profile. cmd=%s", str(cmd)) url = client.protocol + client.localHostPort + "/profile" profile_data = { 'printer_profile': cmd['profile'], 'token': cmd['token'], } headers, body = createFormData(profile_data) operate_request = HTTPRequest(url=url, method="POST", headers=headers, body=body) response = yield client.http_client.fetch(operate_request, request_timeout=60) if response.error: logging.error("handleProfile: Failed to handle profile. error=%s", response.error) else: logging.info("handleProfile: handle profile result. response.body=%r", response.body) res = json_decode(response.body) if res["ret_value"] != 0: #need login logging.warn("handleProfile: Failed to handle profile. ret_val=%s", res["ret_value"]) logging.info("handleProfile: End to handle profile.")
def handleUpdate(cmd): logging.info("handleUpdate: Start to handle update box. cmd=%s", str(cmd)) update_version = cmd["version"] send_data = { 'version' : update_version, } headers, body = createFormData(send_data) url = client.protocol + client.updateHostPort + "/netupdate_ajax" update_request = HTTPRequest(url=url, method="POST", headers=headers, body=body) logging.debug("handleUpdate: Send to update box. version=%s", update_version) response = yield client.http_client.fetch(update_request) if response.error: logging.error("handleUpdate: Failed to send to update. error=%s", response.error) return else: logging.debug("handleUpdate: result. response.body=%r", response.body) updateRes = json_decode(response.body) if updateRes == 0: logging.info("handleUpdate: Succeed to send.") else: logging.error("handleUpdate: Failed to send. ret_value=%d", updateRes) return
def handleFile(cmd): logging.info("handleFile: Start to handle operate. cmd=%s", str(cmd)) url = "" if cmd["type"] == "1": url = client.protocol + client.localHostPort + "/delete" else: url = client.protocol + client.localHostPort + "/rename" del cmd["type"] headers, body = createFormData(cmd) operate_request = HTTPRequest(url=url, method="POST", headers=headers, body=body) response = yield client.http_client.fetch(operate_request, request_timeout=60) if response.error: logging.error("handleFile: Failed to send operate. error=%s", response.error) else: logging.info("handleFile: Operate result. response.body=%r", response.body) res = json_decode(response.body) if res["ret_value"] != 0: #need login logging.warn("handleFile: Failed to execute [%s] operation. ret_val=", res["ret_value"])
def poll(self): """Poll for and return the raw status data provided by the Presto REST API. :returns: dict -- JSON status information or ``None`` if the query is done :raises: ``ProgrammingError`` when no query has been started .. note:: This is not a part of DB-API. """ if self._state == self._STATE_NONE: raise ProgrammingError("No query yet") if self._nextUri is None: assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None" raise Return(None) request = HTTPRequest(self._nextUri) client = AsyncHTTPClient(max_clients=512) response = yield client.fetch(request) self._process_response(response) raise Return(json_decode(response.body))
def raw_fetch(self, headers, body): client = SimpleAsyncHTTPClient(self.io_loop) conn = RawRequestHTTPConnection( self.io_loop, client, httpclient._RequestProxy( httpclient.HTTPRequest(self.get_url("/")), dict(httpclient.HTTPRequest._DEFAULTS)), None, self.stop, 1024 * 1024, Resolver(io_loop=self.io_loop)) conn.set_request( b"\r\n".join(headers + [utf8("Content-Length: %d\r\n" % len(body))]) + b"\r\n" + body) response = self.wait() client.close() response.rethrow() return response
def send_email(to, subject, html): if isinstance(to, unicode): to = to.encode('utf-8') if isinstance(subject, unicode): subject = subject.encode('utf-8') if isinstance(html, unicode): html = html.encode('utf-8') data = { 'from': CONFIG.EMAIL_SENDER, 'to': to, 'subject': subject, 'html': html } data = urlencode(data) request = HTTPRequest( url=_MAILGUN_API_URL, method='POST', auth_username='api', auth_password=CONFIG.MAILGUN_API_KEY, body=data ) client = AsyncHTTPClient() try: yield client.fetch(request) except HTTPError as e: try: response = e.response.body except AttributeError: response = None logging.exception('failed to send email:\nto: %s\nsubject: %s\nhtml: %s\nresponse: %s', to, subject, html, response)
def test_websocket_headers(self): # Ensure that arbitrary headers can be passed through websocket_connect. ws = yield websocket_connect( HTTPRequest('ws://127.0.0.1:%d/header' % self.get_http_port(), headers={'X-Test': 'hello'})) response = yield ws.read_message() self.assertEqual(response, 'hello') yield self.close(ws)
def test_check_origin_valid_no_path(self): port = self.get_http_port() url = 'ws://127.0.0.1:%d/echo' % port headers = {'Origin': 'http://127.0.0.1:%d' % port} ws = yield websocket_connect(HTTPRequest(url, headers=headers), io_loop=self.io_loop) ws.write_message('hello') response = yield ws.read_message() self.assertEqual(response, 'hello') yield self.close(ws)
def test_check_origin_valid_with_path(self): port = self.get_http_port() url = 'ws://127.0.0.1:%d/echo' % port headers = {'Origin': 'http://127.0.0.1:%d/something' % port} ws = yield websocket_connect(HTTPRequest(url, headers=headers), io_loop=self.io_loop) ws.write_message('hello') response = yield ws.read_message() self.assertEqual(response, 'hello') yield self.close(ws)
def test_check_origin_invalid_partial_url(self): port = self.get_http_port() url = 'ws://127.0.0.1:%d/echo' % port headers = {'Origin': '127.0.0.1:%d' % port} with self.assertRaises(HTTPError) as cm: yield websocket_connect(HTTPRequest(url, headers=headers), io_loop=self.io_loop) self.assertEqual(cm.exception.code, 403)
def test_check_origin_invalid(self): port = self.get_http_port() url = 'ws://127.0.0.1:%d/echo' % port # Host is 127.0.0.1, which should not be accessible from some other # domain headers = {'Origin': 'http://somewhereelse.com'} with self.assertRaises(HTTPError) as cm: yield websocket_connect(HTTPRequest(url, headers=headers), io_loop=self.io_loop) self.assertEqual(cm.exception.code, 403)
def test_prepare_curl_callback_stack_context(self): exc_info = [] def error_handler(typ, value, tb): exc_info.append((typ, value, tb)) self.stop() return True with ExceptionStackContext(error_handler): request = HTTPRequest(self.get_url('/'), prepare_curl_callback=lambda curl: 1 / 0) self.http_client.fetch(request, callback=self.stop) self.wait() self.assertEqual(1, len(exc_info)) self.assertIs(exc_info[0][0], ZeroDivisionError)
def test_reuse_request_from_response(self): # The response.request attribute should be an HTTPRequest, not # a _RequestProxy. # This test uses self.http_client.fetch because self.fetch calls # self.get_url on the input unconditionally. url = self.get_url('/hello') response = yield self.http_client.fetch(url) self.assertEqual(response.request.url, url) self.assertTrue(isinstance(response.request, HTTPRequest)) response2 = yield self.http_client.fetch(response.request) self.assertEqual(response2.body, b'Hello world!')
def test_request_set(self): proxy = _RequestProxy(HTTPRequest('http://example.com/', user_agent='foo'), dict()) self.assertEqual(proxy.user_agent, 'foo')
def test_default_set(self): proxy = _RequestProxy(HTTPRequest('http://example.com/'), dict(network_interface='foo')) self.assertEqual(proxy.network_interface, 'foo')
def test_both_set(self): proxy = _RequestProxy(HTTPRequest('http://example.com/', proxy_host='foo'), dict(proxy_host='bar')) self.assertEqual(proxy.proxy_host, 'foo')
def test_bad_attribute(self): proxy = _RequestProxy(HTTPRequest('http://example.com/'), dict()) with self.assertRaises(AttributeError): proxy.foo
def test_defaults_none(self): proxy = _RequestProxy(HTTPRequest('http://example.com/'), None) self.assertIs(proxy.auth_username, None)
def test_str(self): response = HTTPResponse(HTTPRequest('http://example.com'), 200, headers={}, buffer=BytesIO()) s = str(response) self.assertTrue(s.startswith('HTTPResponse(')) self.assertIn('code=200', s)
def test_headers(self): request = HTTPRequest('http://example.com', headers={'foo': 'bar'}) self.assertEqual(request.headers, {'foo': 'bar'})
def test_headers_setter(self): request = HTTPRequest('http://example.com') request.headers = {'bar': 'baz'} self.assertEqual(request.headers, {'bar': 'baz'})
def test_body(self): request = HTTPRequest('http://example.com', body='foo') self.assertEqual(request.body, utf8('foo'))
def test_body_setter(self): request = HTTPRequest('http://example.com') request.body = 'foo' self.assertEqual(request.body, utf8('foo'))
def test_if_modified_since(self): http_date = datetime.datetime.utcnow() request = HTTPRequest('http://example.com', if_modified_since=http_date) self.assertEqual(request.headers, {'If-Modified-Since': format_timestamp(http_date)})