我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.wsgi.LimitedStream()。
def __call__(self, environ, start_response): limit = min(self.maximum_size, int(environ.get('CONTENT_LENGTH') or 0)) environ['wsgi.input'] = LimitedStream(environ['wsgi.input'], limit) return self.app(environ, start_response)
def test_other_method_payload(self): data = b'Hello World' req = wrappers.Request.from_values(input_stream=BytesIO(data), content_length=len(data), content_type='text/plain', method='WHAT_THE_FUCK') self.assert_equal(req.get_data(), data) self.assert_is_instance(req.stream, LimitedStream)
def test_limited_stream_disconnection(self): io = BytesIO(b'A bit of content') # disconnect detection on out of bytes stream = wsgi.LimitedStream(io, 255) with self.assert_raises(ClientDisconnected): stream.read() # disconnect detection because file close io = BytesIO(b'x' * 255) io.close() stream = wsgi.LimitedStream(io, 255) with self.assert_raises(ClientDisconnected): stream.read()