我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.httputil()。
def set_status(self, status_code, reason=None): """Sets the status code for our response. :arg int status_code: Response status code. If ``reason`` is ``None``, it must be present in `httplib.responses <http.client.responses>`. :arg string reason: Human-readable reason phrase describing the status code. If ``None``, it will be filled in from `httplib.responses <http.client.responses>`. """ self._status_code = status_code if reason is not None: self._reason = escape.native_str(reason) else: try: self._reason = httputil.responses[status_code] except KeyError: raise ValueError("unknown status code %d", status_code)
def _convert_header_value(self, value): if isinstance(value, bytes): pass elif isinstance(value, unicode_type): value = value.encode('utf-8') elif isinstance(value, numbers.Integral): # return immediately since we know the converted value will be safe return str(value) elif isinstance(value, datetime.datetime): return httputil.format_timestamp(value) else: raise TypeError("Unsupported header value %r" % value) # If \n is allowed into the header, it is possible to inject # additional headers or split the request. if RequestHandler._INVALID_HEADER_CHAR_RE.search(value): raise ValueError("Unsafe header value %r", value) return value
def _handle_request_exception(self, e): if isinstance(e, Finish): # Not an error; just finish the request without logging. if not self._finished: self.finish(*e.args) return try: self.log_exception(*sys.exc_info()) except Exception: # An error here should still get a best-effort send_error() # to avoid leaking the connection. app_log.error("Error in exception logger", exc_info=True) if self._finished: # Extra errors after the request has been finished should # be logged, but there is no reason to continue to try and # send a response. return if isinstance(e, HTTPError): if e.status_code not in httputil.responses and not e.reason: gen_log.error("Bad HTTP status code: %d", e.status_code) self.send_error(500, exc_info=sys.exc_info()) else: self.send_error(e.status_code, exc_info=sys.exc_info()) else: self.send_error(500, exc_info=sys.exc_info())
def _convert_header_value(self, value): if isinstance(value, bytes): pass elif isinstance(value, unicode_type): value = value.encode('utf-8') elif isinstance(value, numbers.Integral): # return immediately since we know the converted value will be safe return str(value) elif isinstance(value, datetime.datetime): return httputil.format_timestamp(value) else: raise TypeError("Unsupported header value %r" % value) # If \n is allowed into the header, it is possible to inject # additional headers or split the request. Also cap length to # prevent obviously erroneous values. if (len(value) > 4000 or RequestHandler._INVALID_HEADER_CHAR_RE.search(value)): raise ValueError("Unsafe header value %r", value) return value
def _handle_request_exception(self, e): if isinstance(e, Finish): # Not an error; just finish the request without logging. if not self._finished: self.finish() return try: self.log_exception(*sys.exc_info()) except Exception: # An error here should still get a best-effort send_error() # to avoid leaking the connection. app_log.error("Error in exception logger", exc_info=True) if self._finished: # Extra errors after the request has been finished should # be logged, but there is no reason to continue to try and # send a response. return if isinstance(e, HTTPError): if e.status_code not in httputil.responses and not e.reason: gen_log.error("Bad HTTP status code: %d", e.status_code) self.send_error(500, exc_info=sys.exc_info()) else: self.send_error(e.status_code, exc_info=sys.exc_info()) else: self.send_error(500, exc_info=sys.exc_info())
def set_status(self, status_code, reason=None): """Sets the status code for our response. :arg int status_code: Response status code. If ``reason`` is ``None``, it must be present in `httplib.responses <http.client.responses>`. :arg string reason: Human-readable reason phrase describing the status code. If ``None``, it will be filled in from `httplib.responses <http.client.responses>`. """ self._status_code = status_code if reason is not None: self._reason = escape.native_str(reason) else: try: self._reason = httputil.responses[status_code] except KeyError: raise ValueError("unknown status code %d" % status_code)
def clear(self): """Resets all headers and content for this response.""" self._headers = httputil.HTTPHeaders({ "Server": "TornadoServer/%s" % tornado.version, "Content-Type": "text/html; charset=UTF-8", "Date": httputil.format_timestamp(time.time()), }) self.set_default_headers() self._write_buffer = [] self._status_code = 200 self._reason = httputil.responses[200]
def cookies(self): """An alias for `self.request.cookies <.httputil.HTTPServerRequest.cookies>`.""" return self.request.cookies
def set_cookie(self, name, value, domain=None, expires=None, path="/", expires_days=None, **kwargs): """Sets the given cookie name/value with the given options. Additional keyword arguments are set on the Cookie.Morsel directly. See http://docs.python.org/library/cookie.html#morsel-objects for available attributes. """ # The cookie library only accepts type str, in both python 2 and 3 name = escape.native_str(name) value = escape.native_str(value) if re.search(r"[\x00-\x20]", name + value): # Don't let us accidentally inject bad stuff raise ValueError("Invalid cookie %r: %r" % (name, value)) if not hasattr(self, "_new_cookie"): self._new_cookie = Cookie.SimpleCookie() if name in self._new_cookie: del self._new_cookie[name] self._new_cookie[name] = value morsel = self._new_cookie[name] if domain: morsel["domain"] = domain if expires_days is not None and not expires: expires = datetime.datetime.utcnow() + datetime.timedelta( days=expires_days) if expires: morsel["expires"] = httputil.format_timestamp(expires) if path: morsel["path"] = path for k, v in kwargs.items(): if k == 'max_age': k = 'max-age' # skip falsy values for httponly and secure flags because # SimpleCookie sets them regardless if k in ['httponly', 'secure'] and not v: continue morsel[k] = v
def headers_received(self, start_line, headers): self.set_request(httputil.HTTPServerRequest( connection=self.connection, start_line=start_line, headers=headers)) if self.stream_request_body: self.request.body = Future() return self.execute()
def __str__(self): message = "HTTP %d: %s" % ( self.status_code, self.reason or httputil.responses.get(self.status_code, 'Unknown')) if self.log_message: return message + " (" + (self.log_message % self.args) + ")" else: return message