我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用tornado.gen.maybe_future()。
def _consume(self, msg): io_loop = tornado.ioloop.IOLoop.instance() if msg.content_type != 'application/json': LOG.warn('invalid content-type header.' ' only json content is acceptable.' ' message rejected.') msg.reject(requeue=False) return False try: data = json_decode(msg.body) except ValueError as e: msg.reject(requeue=False) LOG.warn('malformed json message: %s. reason: %s ' 'message rejected.' % (msg.body, e)) else: future = maybe_future(self._on_message(data)) io_loop.add_future(future, lambda f: self._ack(f, msg))
def _read_chunked_body(self, delegate): # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 total_size = 0 while True: chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64) chunk_len = int(chunk_len.strip(), 16) if chunk_len == 0: return total_size += chunk_len if total_size > self._max_body_size: raise httputil.HTTPInputError("chunked body too large") bytes_to_read = chunk_len while bytes_to_read: chunk = yield self.stream.read_bytes( min(bytes_to_read, self.params.chunk_size), partial=True) bytes_to_read -= len(chunk) if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): yield gen.maybe_future(delegate.data_received(chunk)) # chunk ends with \r\n crlf = yield self.stream.read_bytes(2) assert crlf == b"\r\n"
def get(self): """Crossdock sends GET requests with query params to initiate test.""" behavior = self.get_query_argument('behavior') respw = ResponseWriter() params = { 'respw': respw, 'server': self.get_query_argument('server', None), 'transport': self.get_query_argument('transport', None), 'encoding': self.get_query_argument('encoding', None), } fn = BEHAVIORS.get(behavior) if fn is None: self.write(json.dumps([{ "status": SKIPPED, "output": "Not implemented", }])) return try: yield gen.maybe_future(fn(**params)) self.write(json.dumps(respw.entries)) except Exception as e: self.write(json.dumps([{ "status": FAILED, "output": "%s" % e }])) return
def process_sayHello(self, seqid, iprot, oprot): args = sayHello_args() args.read(iprot) iprot.readMessageEnd() result = sayHello_result() result.success = yield gen.maybe_future(self._handler.sayHello()) oprot.writeMessageBegin("sayHello", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() # HELPER FUNCTIONS AND STRUCTURES
def process_sayHello(self, seqid, iprot, oprot): args = sayHello_args() args.read(iprot) iprot.readMessageEnd() result = sayHello_result() result.success = yield gen.maybe_future(self._handler.sayHello()) oprot.writeMessageBegin("sayHello", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush()
def process_getData(self, seqid, iprot, oprot): args = getData_args() args.read(iprot) iprot.readMessageEnd() result = getData_result() result.success = yield gen.maybe_future(self._handler.getData(args.input)) oprot.writeMessageBegin("getData", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() # HELPER FUNCTIONS AND STRUCTURES
def async_fetch(self, task, callback=None): '''Do one fetch''' url = task.get('url', 'data:,') if callback is None: callback = self.send_result type = 'None' start_time = time.time() try: if url.startswith('data:'): type = 'data' result = yield gen.maybe_future(self.data_fetch(url, task)) elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'): type = 'phantomjs' result = yield self.phantomjs_fetch(url, task) elif task.get('fetch', {}).get('fetch_type') in ('splash', ): type = 'splash' result = yield self.splash_fetch(url, task) else: type = 'http' result = yield self.http_fetch(url, task) except Exception as e: logger.exception(e) result = self.handle_error(type, url, task, start_time, e) callback(type, task, result) self.on_result(type, task, result) raise gen.Return(result)
def can_fetch(self, user_agent, url): parsed = urlsplit(url) domain = parsed.netloc if domain in self.robots_txt_cache: robot_txt = self.robots_txt_cache[domain] if time.time() - robot_txt.mtime() > self.robot_txt_age: robot_txt = None else: robot_txt = None if robot_txt is None: robot_txt = RobotFileParser() try: response = yield gen.maybe_future(self.http_client.fetch( urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30)) content = response.body except tornado.httpclient.HTTPError as e: logger.error('load robots.txt from %s error: %r', domain, e) content = '' try: content = content.decode('utf8', 'ignore') except UnicodeDecodeError: content = '' robot_txt.parse(content.splitlines()) self.robots_txt_cache[domain] = robot_txt raise gen.Return(robot_txt.can_fetch(user_agent, url))
def mock_tornado(*args, **kwargs): m = mock.Mock(*args, **kwargs) if not len(args) and not kwargs.get('return_value'): m.return_value = gen.maybe_future(mock_tornado) return m
def test_wait_event(self): bot = AB.Bot() test_event = {'unittest': True} waiter = bot.wait_for_event(**test_event) bot.event_to_chat = mock_tornado() bot._get_next_event = mock_tornado( side_effect=[gen.maybe_future(test_event), TestException]) try: yield bot.start() except TestException: pass event = yield waiter self.assertEquals(event, test_event)
def _read_fixed_body(self, content_length, delegate): while content_length > 0: body = yield self.stream.read_bytes( min(self.params.chunk_size, content_length), partial=True) content_length -= len(body) if not self._write_finished or self.is_client: with _ExceptionLoggingContext(app_log): yield gen.maybe_future(delegate.data_received(body))
def data_received(self, chunk): if self._decompressor: compressed_data = chunk while compressed_data: decompressed = self._decompressor.decompress( compressed_data, self._chunk_size) if decompressed: yield gen.maybe_future( self._delegate.data_received(decompressed)) compressed_data = self._decompressor.unconsumed_tail else: yield gen.maybe_future(self._delegate.data_received(chunk))
def _async_request(self, task): """Async request.""" url = task.get('url') if url.startswith('first_task'): result = yield gen.maybe_future(self._fake_request(url, task)) else: try: if task.get('fetch', {}).get('fetch_type') == 'js': result = yield self._phantomjs_request(url, task) else: result = yield self._http_request(url, task) except Exception as e: logger.exception(e) if task.get('process', {}).get('callback'): results, follows, db_name, coll_name = self.processor.handle_result(task, result) if results: # put results to resultdb self.processor.put_results(results, db_name, coll_name, task) if follows: # put new tasks to newtask_queue self.put_follows(follows) raise gen.Return(result)
def process_startTrace(self, seqid, iprot, oprot): args = startTrace_args() args.read(iprot) iprot.readMessageEnd() result = startTrace_result() result.success = yield gen.maybe_future(self._handler.startTrace(args.request)) oprot.writeMessageBegin("startTrace", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush()
def process_joinTrace(self, seqid, iprot, oprot): args = joinTrace_args() args.read(iprot) iprot.readMessageEnd() result = joinTrace_result() result.success = yield gen.maybe_future(self._handler.joinTrace(args.request)) oprot.writeMessageBegin("joinTrace", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() # HELPER FUNCTIONS AND STRUCTURES
def submit(fn, io_loop, *args, **kwargs): """Submit Tornado Coroutine to IOLoop.current(). :param fn: Tornado Coroutine to execute :param io_loop: Tornado IOLoop where to schedule the coroutine :param args: Args to pass to coroutine :param kwargs: Kwargs to pass to coroutine :returns concurrent.futures.Future: future result of coroutine """ future = Future() def execute(): """Execute fn on the IOLoop.""" try: result = gen.maybe_future(fn(*args, **kwargs)) except Exception: # The function we ran didn't return a future and instead raised # an exception. Let's pretend that it returned this dummy # future with our stack trace. f = gen.Future() f.set_exc_info(sys.exc_info()) on_done(f) else: result.add_done_callback(on_done) def on_done(tornado_future): """ Set tornado.Future results to the concurrent.Future. :param tornado_future: """ exception = tornado_future.exception() if not exception: future.set_result(tornado_future.result()) else: future.set_exception(exception) io_loop.add_callback(execute) return future
def process_submitZipkinBatch(self, seqid, iprot, oprot): args = submitZipkinBatch_args() args.read(iprot) iprot.readMessageEnd() result = submitZipkinBatch_result() result.success = yield gen.maybe_future(self._handler.submitZipkinBatch(args.spans)) oprot.writeMessageBegin("submitZipkinBatch", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() # HELPER FUNCTIONS AND STRUCTURES
def process_emitZipkinBatch(self, seqid, iprot, oprot): args = emitZipkinBatch_args() args.read(iprot) iprot.readMessageEnd() yield gen.maybe_future(self._handler.emitZipkinBatch(args.spans)) # HELPER FUNCTIONS AND STRUCTURES
def process_ping(self, seqid, iprot, oprot): args = ping_args() args.read(iprot) iprot.readMessageEnd() result = ping_result() result.success = yield gen.maybe_future(self._handler.ping()) oprot.writeMessageBegin("ping", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() # HELPER FUNCTIONS AND STRUCTURES
def handle_stream(self, stream, address): host, port = address trans = TTornadoStreamTransport( host=host, port=port, stream=stream, io_loop=self.io_loop, read_timeout=self.transport_read_timeout) try: oprot = self._oprot_factory.get_protocol(trans) iprot = self._iprot_factory.get_protocol(TMemoryBuffer()) while not trans.stream.closed(): # TODO: maybe read multiple frames in advance for concurrency try: frame = yield trans.read_frame() except TTransportException as e: if e.type == TTransportException.END_OF_FILE: break else: raise iprot.trans.setvalue(frame) api, seqid, result, call = self._processor.process_in(iprot) if isinstance(result, TApplicationException): self._processor.send_exception(oprot, api, result, seqid) else: try: result.success = yield gen.maybe_future(call()) except Exception as e: # raise if api don't have throws self._processor.handle_exception(e, result) self._processor.send_result(oprot, api, result, seqid) except Exception: logger.exception('thrift exception in handle_stream') trans.close() logger.info('client disconnected %s:%d', host, port)
def start_kernel(self, kernel_id=None, *args, **kwargs): self.log.debug("RemoteMappingKernelManager.start_kernel: {}".format(kwargs['kernel_name'])) kernel_id = yield gen.maybe_future(super(RemoteMappingKernelManager, self).start_kernel(*args, **kwargs)) self.parent.kernel_session_manager.create_session(kernel_id, **kwargs) raise gen.Return(kernel_id)
def process_echo(self, seqid, iprot, oprot): args = echo_args() args.read(iprot) iprot.readMessageEnd() result = echo_result() result.success = yield gen.maybe_future(self._handler.echo(args.str)) oprot.writeMessageBegin("echo", TMessageType.REPLY, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() # HELPER FUNCTIONS AND STRUCTURES