我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rq.Queue()。
def enqueue_periodic_jobs(queue_name): """Enqueue all PYBOSSA periodic jobs.""" from pybossa.core import sentinel from rq import Queue redis_conn = sentinel.master jobs_generator = get_periodic_jobs(queue_name) n_jobs = 0 queue = Queue(queue_name, connection=redis_conn) for job in jobs_generator: if (job['queue'] == queue_name): n_jobs += 1 queue.enqueue_call(func=job['name'], args=job['args'], kwargs=job['kwargs'], timeout=job['timeout']) msg = "%s jobs in %s have been enqueued" % (n_jobs, queue_name) return msg
def tester_tasker(self): if int(time.time())-int(self.time_log["tester"]) >= 1800: logging.info('start enqueue tester') #losd json if os.path.exists('.topics_tester.json'): with open('.topics_tester.json','r') as f: tmp_topics=json.load(f) else: tmp_topics=list() #main sql="SELECT ID FROM TOPIC WHERE (time - created) < 345600 AND ID NOT IN (SELECT T_ID FROM STATUS) AND (STRFTIME('%s','now') - created) > 1209600;" sleep_time=20 self.SQ.cursor.execute(sql) topic_ids=[x[0] for x in self.SQ.cursor.fetchall()] q=Queue('tester',connection=self.redis_conn) for topic_id in topic_ids: if topic_id not in tmp_topics: q.enqueue(topic_tester.start,topic_id, sleep_time) tmp_topics.append(topic_id) #end tmp_topics=list(set(tmp_topics)) with open('.topics_tester.json','w') as f: json.dump(tmp_topics,f) self.time_log["tester"]=str(int(time.time())) return
def main(msg, config, silent=False): """ Job enqueue :param msg:str :param config: :return: """ queue_dsn = config["queue"]["dsn"] redis_conn = redis.from_url(queue_dsn) q = Queue('low', connection=redis_conn) ret = q.enqueue(push_messenger, msg, result_ttl=60) if silent is True: return ret else: print ret
def get_tasks_on_queue(queue_name): q = Queue(queue_name, connection=Redis(host=envget('redis.host'))) jobs = q.jobs tasks = [] for job in jobs: task = {"date_enqueued": str( process_date(job.to_dict().get('enqueued_at')))} ''' to_dict() returns something like this: {u'origin': u'task_no_vt', u'status': u'queued', u'description': u"Api.task.generic_task('N7UFZ56FQDITJ34F40TZB50XAWVNW575QGIL4YEC')", u'created_at': '2017-03-03T20:14:47Z', u'enqueued_at': '2017-03-03T20:14:47Z', u'timeout': 31536000, u'data': '\x80\x02(X\x15\x00\x00\x00Api.task.generic_taskq\x01NU(N7UFZ56FQDITJ34F40TZB50XAWVNW575QGIL4YECq\x02\x85q\x03}q\x04tq\x05.'} ''' task_id = re.search('[A-Z0-9]{40}', job.to_dict().get('description')) if task_id is None: continue task['task_id'] = task_id.group(0) task['hashes'] = count_valid_hashes_in_task(task['task_id']) tasks.append(task) return tasks
def add_task(requested): task_id = id_generator(40) if requested.get('document_name') is None: requested["document_name"] = "" response = {"requested": requested, "date_enqueued": datetime.datetime.now(), "task_id": task_id} save(response) if requested.get('vt_samples'): queue_name = "task_private_vt" # task needs a private VT api elif requested.get('vt_av') and not requested.get('vt_samples'): queue_name = "task_public_vt" # task needs a public VT api else: queue_name = "task_no_vt" # task doesn't need VT q = Queue(queue_name, connection=Redis(host=envget('redis.host'))) job = q.enqueue('Api.task.generic_task', args=(task_id,), timeout=31536000) return task_id
def get_queue(name=DEFAULT_QUEUE_NAME): u''' Get a job queue. The job queue is initialized if that hasn't happened before. :param string name: The name of the queue. If not given then the default queue is returned. :returns: The job queue. :rtype: ``rq.queue.Queue`` .. seealso:: :py:func:`get_all_queues` ''' global _queues fullname = add_queue_name_prefix(name) try: return _queues[fullname] except KeyError: log.debug(u'Initializing background job queue "{}"'.format(name)) redis_conn = _connect() queue = _queues[fullname] = rq.Queue(fullname, connection=redis_conn) return queue
def main(): redis_conn = Redis() q = Queue(connection=redis_conn) if not os.path.isdir(BLOB_DIR): os.mkdir(BLOB_DIR) try: num_blobs = int(sys.argv[1]) except IndexError: num_blobs = 1 blobs = [] for i in range(num_blobs): blob_contents = os.urandom(BLOB_SIZE) blob_hash = hashlib.sha384(blob_contents).hexdigest() blob_path = os.path.join(BLOB_DIR, blob_hash) with open(blob_path, 'wb') as f: f.write(blob_contents) blobs.append(blob_hash) for blob_hash in blobs: q.enqueue(process_blob, blob_hash, 1)
def test_max_retries_job(self): """Test if the job will fail after max_retries limit is reached""" http_requests = setup_mock_redmine_server(max_failures=2) args = { 'url': REDMINE_URL, 'api_token': 'AAAA', 'max_issues': 3 } q = rq.Queue('queue', async=False) with self.assertRaises(requests.exceptions.HTTPError): job = q.enqueue(execute_perceval_job, backend='redmine', backend_args=args, qitems='items', task_id='mytask', max_retries=1) self.assertEqual(job.is_failed, True)
def test_job_caching_not_supported(self): """Check if it fails when caching is not supported""" args = { 'uri': 'http://example.com/', 'gitpath': os.path.join(self.dir, 'data/git_log.txt') } q = rq.Queue('queue', async=False) with self.assertRaises(AttributeError): job = q.enqueue(execute_perceval_job, backend='git', backend_args=args, qitems='items', task_id='mytask', cache_path=self.tmp_path) with self.assertRaises(AttributeError): job = q.enqueue(execute_perceval_job, backend='git', backend_args=args, qitems='items', task_id='mytask', fetch_from_cache=True)
def handle(self, message): if 'key' not in message: return if message['key'] != 'set_variable': return group_id = message['group_id'] train_id = message['train_id'] parallel_count = message['parallel_count'] variables = message['variables'] tid = message['transaction_id'] group_dict = self.group_dict if group_id not in group_dict: rqq = Queue(connection=self.raw_conn) group = Group(group_id, train_id, parallel_count, variables, rqq) group_dict[group_id] = group group = group_dict[group_id] cur_sum_count = 1 group.add_message((tid, cur_sum_count))
def test_set_rq(): """ When Redis params are provided, set an RQ instance. """ config = get_config('read/set-rq') # Should set an instance. assert isinstance(config.rq, Queue) args = config.rq.connection.connection_pool.connection_kwargs # Should use config args. assert args['host'] == 'host' assert args['port'] == 1337 assert args['db'] == 1
def call_fn(fn): kwargs = {} for name in request.args: kwargs[name] = request.args.get(name) q = Queue(fn, connection=redis_conn) job = q.enqueue('handler.handler', kwargs=kwargs) image_id = images[fn] subprocess.check_call(['docker', 'run', '--network', 'scrappyserverless_default', image_id]) return repr(job.result) # PLAN: Build docker images from function code # do not delete
def enqueue_job(job): """Enqueues a job.""" from pybossa.core import sentinel from rq import Queue redis_conn = sentinel.master queue = Queue(job['queue'], connection=redis_conn) queue.enqueue_call(func=job['name'], args=job['args'], kwargs=job['kwargs'], timeout=job['timeout']) return True
def check_failed(): """Check the jobs that have failed and requeue them.""" from rq import Queue, get_failed_queue, requeue_job from pybossa.core import sentinel fq = get_failed_queue() job_ids = fq.job_ids count = len(job_ids) FAILED_JOBS_RETRIES = current_app.config.get('FAILED_JOBS_RETRIES') for job_id in job_ids: KEY = 'pybossa:job:failed:%s' % job_id job = fq.fetch_job(job_id) if sentinel.slave.exists(KEY): sentinel.master.incr(KEY) else: ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60 sentinel.master.setex(KEY, ttl, 1) if int(sentinel.slave.get(KEY)) < FAILED_JOBS_RETRIES: requeue_job(job_id) else: KEY = 'pybossa:job:failed:mailed:%s' % job_id if (not sentinel.slave.exists(KEY) and current_app.config.get('ADMINS')): subject = "JOB: %s has failed more than 3 times" % job_id body = "Please, review the background jobs of your server." body += "\n This is the trace error\n\n" body += "------------------------------\n\n" body += job.exc_info mail_dict = dict(recipients=current_app.config.get('ADMINS'), subject=subject, body=body) send_mail(mail_dict) ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60 sentinel.master.setex(KEY, ttl, True) if count > 0: return "JOBS: %s You have failed the system." % job_ids else: return "You have not failed the system"
def test_file_drop_GZHandler(): """ Tests the GZZHandler for file drop """ a = file_drop.GZHandler() class Event: """ Creates a mock event object for tests """ event_type = None src_path = None is_directory = None q = None r = None def __init__(self, event_type, src_path, is_directory): """ initializes necessary variables for the object """ self.event_type = event_type self.src_path = src_path self.is_directory = is_directory self.q = Queue(connection=Redis(host='localhost'), default_timeout=86400) self.r = StrictRedis(host='localhsot', port=6379, db=0) b = Event("created", "/dev/null", False) c = Event("modified", "/etc/hosts", False) a.process(b) a.process(b) a.process(b) a.on_created(b) a.on_modified(c)
def make_queue(name='default'): queue = Queue(connection=redis_connection()) return queue
def get_failed_queue_index(name='default'): """ Returns the position of FailedQueue for the named queue in QUEUES_LIST """ # Get the index of FailedQueue for 'default' Queue in QUEUES_LIST queue_index = None connection = get_connection(name) connection_kwargs = connection.connection_pool.connection_kwargs for i in range(0, 100): q = get_queue_by_index(i) if q.name == 'failed' and q.connection.connection_pool.connection_kwargs == connection_kwargs: queue_index = i break return queue_index
def get_queue_index(name='default'): """ Returns the position of Queue for the named queue in QUEUES_LIST """ queue_index = None connection = get_connection(name) connection_kwargs = connection.connection_pool.connection_kwargs for i in range(0, 100): q = get_queue_by_index(i) if q.name == name and q.connection.connection_pool.connection_kwargs == connection_kwargs: queue_index = i break return queue_index
def test_get_current_job(self): """ Ensure that functions using RQ's ``get_current_job`` doesn't fail when run from rqworker (the job id is not in the failed queue). """ queue = get_queue() job = queue.enqueue(access_self) call_command('rqworker', '--burst') failed_queue = Queue(name='failed', connection=queue.connection) self.assertFalse(job.id in failed_queue.job_ids) job.delete()
def test_delete_job(self): """ In addition to deleting job from Redis, the job id also needs to be deleted from Queue. """ queue = get_queue('django_rq_test') queue_index = get_queue_index('django_rq_test') job = queue.enqueue(access_self) self.client.post(reverse('rq_delete_job', args=[queue_index, job.id]), {'post': 'yes'}) self.assertFalse(Job.exists(job.id, connection=queue.connection)) self.assertNotIn(job.id, queue.job_ids)
def test__get_queue(self, mod): from rq import Queue assert isinstance(mod._get_queue(), Queue)
def _get_queue(): # Connect to Redis Queue return Queue('low', connection=conn) # These are constants because other components may listen for these # messages in logs:
def api_notifications(): """Receive MTurk REST notifications.""" event_type = request.values['Event.1.EventType'] assignment_id = request.values.get('Event.1.AssignmentId') participant_id = request.values.get('participant_id') # Add the notification to the queue. db.logger.debug('rq: Queueing %s with id: %s for worker_function', event_type, assignment_id) q.enqueue(worker_function, event_type, assignment_id, participant_id) db.logger.debug('rq: Submitted Queue Length: %d (%s)', len(q), ', '.join(q.job_ids)) return success_response()
def main(): """ Init workers """ parser = argparse.ArgumentParser() parser.add_argument('--queues', nargs='+', type=unicode, dest='queues', required=True) args = parser.parse_args() with Connection(connection=StrictRedis(**settings.REDIS)): qs = map(Queue, args.queues) or [Queue()] worker = Worker(qs) worker.work()
def main(): app = Application(urls) conn = redis.from_url(app.config['redis']['url']) app.q = Queue(connection=conn) app.start()
def get_osf_queue(queue_name): redis_connection = get_redis_connection() osf_queue = Queue(queue_name, connection=redis_connection, default_timeout=DEFAULT_JOB_TIMEOUT) return osf_queue
def main(): with Connection(redis_connection): worker = Worker(Queue('default')) worker.work()
def get_job_queue(redis_client=None): redis_client = redis_client or get_redis_client() return Queue(connection=redis_client)
def __init__(self): NwRedis.__init__(self) NwParser.__init__(self) self.topic_job_queue = rq.Queue( connection=self.redis_connection, name='scrape_topics', default_timeout=200 ) self.schedule = BlockingScheduler()
def gen_topic_queue(self): logging.debug('start topic enqueue') topics_sql=self.topics_id_sqlite() if len(topics_sql) <= 2000: return topics_rss=self.topics_id_rss() # load topics if os.path.exists('.topics_all.json'): with open('.topics_all.json','r') as f: tmp_topics=json.load(f) else: tmp_topics=list() t_queue=Queue('topic',connection=self.redis_conn) # gen queue for topic in topics_rss: if topic not in topics_sql and topic not in tmp_topics: topic_id=int(topic) t_queue.enqueue(topic_spider.start,topic_id, self.topic_sleep_time) #save topics topics_all=list() topics_all.extend(tmp_topics) topics_all.extend(topics_rss) topics_all.extend(topics_sql) topics_all=list(set(topics_all)) with open('.topics_all.json','w') as f: json.dump(topics_all, f) return
def tasker(self): node_configs_1=[{'sql':'SELECT ID FROM NODES WHERE topics >= 8000;','sleep_time':5,'between_time':900,'time_log':'8000_node','queue_name':'node1'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 3000 AND 8000;','sleep_time':10,'between_time':1800,'time_log':'4000_node','queue_name':'node2'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1000 AND 3000;','sleep_time':20,'between_time':7200,'time_log':'1000_node','queue_name':'node3'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 100 AND 1000;','sleep_time':90,'between_time':86400,'time_log':'500_node','queue_name':'node4'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1 AND 100;','sleep_time':90,'between_time':86400,'time_log':'0_node','queue_name':'node5'}] node_configs_2=[{'sql':'SELECT ID FROM NODES WHERE topics >= 8000;','sleep_time':5,'between_time':1800,'time_log':'8000_node','queue_name':'node1'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 3000 AND 8000;','sleep_time':10,'between_time':3600,'time_log':'4000_node','queue_name':'node2'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1000 AND 3000;','sleep_time':20,'between_time':14400,'time_log':'1000_node','queue_name':'node3'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 100 AND 1000;','sleep_time':90,'between_time':86400,'time_log':'500_node','queue_name':'node4'}, {'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1 AND 100;','sleep_time':90,'between_time':86400,'time_log':'0_node','queue_name':'node5'}] time.tzname=('CST', 'CST') if int(time.strftime('%H')) >= 8 or int(time.strftime('%H')) < 2: node_configs=node_configs_1 else: node_configs=node_configs_2 for node_config in node_configs: sql=node_config['sql'] sleep_time=node_config['sleep_time'] between_time=node_config['between_time'] time_log_name=node_config['time_log'] queue_name=node_config['queue_name'] q_node=Queue(queue_name,connection=self.redis_conn) if int(time.time()) - int(self.time_log[time_log_name]) >= between_time: logging.info('start enqueue, queue name: %s' % queue_name) self.SQ.cursor.execute(sql) node_ids=self.SQ.cursor.fetchall() for node_id in node_ids: node_id=node_id[0] if queue_name not in ['node4','node5'] or (queue_name in ['node4','node5'] and node_id in self.node_number): if queue_name in ['node4','node5']: self.node_number.remove(int(node_id)) q_node.enqueue(node_spider.start,node_id,sleep_time) self.time_log[time_log_name]=str(int(time.time())) return
def sign_up(): """For new users only: sign-up page.""" if request.method == 'POST': f_name = request.form.get('f_name') l_name = request.form.get('l_name') email = request.form.get('email') password = request.form.get('password') goodreads_uid = int(request.form.get('goodreads_uid')) rec_frequency = 1 user_id = 1 user = User.query.filter(User.email == email).all() if user != []: flash("Looks like you've already signed up! Please log in.") return redirect(url_for('index')) else: new_user = User(email=email, password=password, f_name=f_name, l_name=l_name, goodreads_uid=goodreads_uid, rec_frequency=rec_frequency, sign_up_date=dt.datetime.now(), paused=0, user_id=user_id) db.session.add(new_user) db.session.commit() flash("Welcome to NextBook!") session['current_user_id'] = new_user.user_id ## new user setup ### q = Queue(connection=Redis()) results = q.enqueue_call(new_user_full_setup, args = [gr_user_id, new_user.user_id, goodreads_key], ttl=86400) session['new_user_job_id'] = results.get_id() return redirect(url_for('recommendations')) return render_template('sign-up.html')
def fetchmails(): q = Queue(connection=Redis()) job = q.enqueue(gmail.process_user_messages_async, flask.session['credentials']) return flask.jsonify({'jobId': job.get_id()})
def checkstatus(job_id): q = Queue(connection=Redis()) job = q.fetch_job(job_id) return flask.jsonify({'result': job.return_value, 'status': job.get_status()})
def main(msg, config=None, silent=False): """ Job enqueue :param msg:str :param config:object :return: """ queue_dsn = config["queue"]["dsn"] redis_conn = redis.from_url(queue_dsn) q = Queue('high', connection=redis_conn) ret = q.enqueue(push_messenger, msg, result_ttl=60) print ret return ret
def main(config): global worker_config worker_config = config listen = config["listen"].values() queue_dsn = config["queue"]["dsn"] conn = redis.from_url(queue_dsn) with Connection(conn): worker = Worker(map(Queue, listen)) worker.work()
def main(config=None): listen = config["listen"].values() queue_dsn = config["queue"]["dsn"] conn = redis.from_url(queue_dsn) with Connection(conn): worker = Worker(map(Queue, listen)) worker.work()
def add_hash_to_process_queue(sha1): q = Queue('process', connection=Redis(host=envget('redis').get('host'))) job = q.enqueue('process_hash.generic_process_hash', args=(sha1,), timeout=70)
def remove_queue_name_prefix(name): u''' Remove a queue name's prefix. :raises ValueError: if the given name is not prefixed. .. seealso:: :py:func:`add_queue_name_prefix` ''' prefix = _get_queue_name_prefix() if not name.startswith(prefix): raise ValueError(u'Queue name "{}" is not prefixed.'.format(name)) return name[len(prefix):]
def get_all_queues(): u''' Return all job queues currently in use. :returns: The queues. :rtype: List of ``rq.queue.Queue`` instances .. seealso:: :py:func:`get_queue` ''' redis_conn = _connect() prefix = _get_queue_name_prefix() return [q for q in rq.Queue.all(connection=redis_conn) if q.name.startswith(prefix)]
def test_foreign_queues_are_ignored(self): u''' Test that foreign RQ-queues are ignored. ''' # Create queues for this CKAN instance self.enqueue(queue=u'q1') self.enqueue(queue=u'q2') # Create queue for another CKAN instance with changed_config(u'ckan.site_id', u'some-other-ckan-instance'): self.enqueue(queue=u'q2') # Create queue not related to CKAN rq.Queue(u'q4').enqueue_call(jobs.test_job) all_queues = jobs.get_all_queues() names = {jobs.remove_queue_name_prefix(q.name) for q in all_queues} assert_equal(names, {u'q1', u'q2'})
def main(): # clear the failed task queue redis_connection = get_redis_connection(settings['redis server']) qfail = Queue("failed", connection=redis_connection) qfail.empty() # start up server prism_server = PrismServer() reactor.addSystemEventTrigger("before", "startup", prism_server.startService) reactor.addSystemEventTrigger("before", "shutdown", prism_server.stopService) # attempt to redistribute any local blobs if settings['enqueue on startup']: d = enqueue_on_start() reactor.run()
def enqueue_stream(sd_hash, num_blobs_in_stream, db_dir, client_factory_class, redis_address=settings['redis server'], host_infos=None): timeout = (num_blobs_in_stream+1)*30 redis_connection = get_redis_connection(redis_address) q = Queue(connection=redis_connection) q.enqueue(process_stream, sd_hash, db_dir, client_factory_class, redis_address, host_infos, timeout=timeout)
def enqueue_blob(blob_hash, db_dir, client_factory_class, redis_address=settings['redis server'], host_infos=None): redis_connection = get_redis_connection(redis_address) q = Queue(connection=redis_connection) q.enqueue(process_blob, blob_hash, db_dir, client_factory_class, redis_address, host_getter, timeout=60)
def on_post(self, req, resp, doc_index): try: raw_json = req.stream.read() except Exception as ex: raise falcon.HTTPError(falcon.HTTP_400, 'Error', ex.message) try: result_json = json.loads(raw_json, encoding='utf-8') except ValueError: raise falcon.HTTPError(falcon.HTTP_400, 'Malformed JSON', 'Could not decode the request body. The JSON was incorrect.') """ Enqueueing write request as jobs into document_write queue and processing them in the background with workers. """ q = Queue('document_write', connection=self.db.connection()) job = q.enqueue_call( func=postDocument, args=(result_json, doc_index), result_ttl=5000 ) LOG.info('POST request ' + str(job.get_id())) resp.status = falcon.HTTP_202 resp.body = json.dumps(result_json, encoding='utf-8') # This function handles DELETE reuqests
def on_delete(self, req, resp, doc_index): """ Enqueueing write request as jobs into document_delete queue and processing them in the background with workers. """ q = Queue('document_delete', connection=self.db.connection()) job = q.enqueue_call( func=delDocument, args=(doc_index,), result_ttl=5000 ) LOG.info('DELETE request ' + str(job.get_id()))
def test_job_no_result(self): """Execute a Git backend job that will not produce any results""" args = { 'uri': 'http://example.com/', 'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt'), 'from_date': datetime.datetime(2020, 1, 1, 1, 1, 1) } q = rq.Queue('queue', async=False) job = q.enqueue(execute_perceval_job, backend='git', backend_args=args, qitems='items', task_id='mytask') result = job.return_value self.assertEqual(result.job_id, job.get_id()) self.assertEqual(result.task_id, 'mytask') self.assertEqual(result.backend, 'git') self.assertEqual(result.last_uuid, None) self.assertEqual(result.max_date, None) self.assertEqual(result.nitems, 0) self.assertEqual(result.offset, None) self.assertEqual(result.nresumed, 0) commits = self.conn.lrange('items', 0, -1) commits = [pickle.loads(c) for c in commits] self.assertListEqual(commits, [])
def __init__(self, queue=None, redis_url=None, queue_name='kaneda'): if not Redis: raise ImproperlyConfigured('You need to install redis to use the RQ queue.') if not Queue: raise ImproperlyConfigured('You need to install rq library to use the RQ queue.') if queue: if not isinstance(queue, Queue): raise ImproperlyConfigured('"queue" parameter is not an instance of RQ queue.') self.queue = queue elif redis_url: self.queue = Queue(queue_name, connection=Redis.from_url(redis_url)) else: self.queue = Queue(queue_name, connection=Redis())
def __init__(self, data, conn): self.data = data self.parallel_count = int(data['parallel_count']) self.code_name = self.data['code_name'] self.train_id = self.data['train_id'] self.cur_iter_id = 0 self.q = Queue(connection=conn) self.tasks = [] self.enqueue_task()
def main(): # Range of Fibonacci numbers to compute fib_range = range(20, 34) # Kick off the tasks asynchronously async_results = {} q = Queue() for x in fib_range: async_results[x] = q.enqueue(slow_fib, x) start_time = time.time() done = False while not done: os.system('clear') print('Asynchronously: (now = %.2f)' % (time.time() - start_time,)) done = True for x in fib_range: result = async_results[x].return_value if result is None: done = False result = '(calculating)' print('fib(%d) = %s' % (x, result)) print('') print('To start the actual in the background, run a worker:') print(' python examples/run_worker.py') time.sleep(0.2) print('Done')