我已经安装了Celery,并且正在尝试使用Celery First Steps Doc对其进行测试。
我尝试将Redis和RabbitMQ都用作代理和后端,但无法获得结果:
result.get(timeout = 10)
每次,我都会收到此错误:
Traceback (most recent call last): File "<input>", line 11, in <module> File "/home/mehdi/.virtualenvs/python3/lib/python3.4/site-packages/celery/result.py", line 169, in get no_ack=no_ack, File "/home/mehdi/.virtualenvs/python3/lib/python3.4/site-packages/celery/backends/base.py", line 225, in wait_for raise TimeoutError('The operation timed out.') celery.exceptions.TimeoutError: The operation timed out.
经纪人部分似乎工作得很好:当我运行这段代码时
from celery import Celery app = Celery('tasks', backend='redis://localhost/', broker='amqp://') @app.task def add(x, y): return x + y result = add.delay(4,4)
我得到了(按预期)
[2015-08-04 12:05:44,910:INFO / MainProcess]收到的任务:tasks.add [741160b8-cb7b-4e63-93c3-f5e43f8f8a02] [2015-08-04 12:05:44,911:INFO / MainProcess]任务task.add [741160b8-cb7b-4e63-93c3-f5e43f8f8a8]成功0.0004287530000510742s:8
[2015-08-04 12:05:44,910:INFO / MainProcess]收到的任务:tasks.add [741160b8-cb7b-4e63-93c3-f5e43f8f8a02]
[2015-08-04 12:05:44,911:INFO / MainProcess]任务task.add [741160b8-cb7b-4e63-93c3-f5e43f8f8a8]成功0.0004287530000510742s:8
PS:我正在使用Xubuntu 64bit
编辑:
我的app.conf
{'CELERY_RESULT_DB_TABLENAMES': None, 'BROKER_TRANSPORT_OPTIONS': {}, 'BROKER_USE_SSL': False, 'CELERY_BROADCAST_QUEUE': 'celeryctl', 'EMAIL_USE_TLS': False, 'CELERY_STORE_ERRORS_EVEN_IF_IGNORED': False, 'CELERY_CREATE_MISSING_QUEUES': True, 'CELERY_DEFAULT_QUEUE': 'celery', 'CELERY_SEND_TASK_SENT_EVENT': False, 'CELERYD_TASK_TIME_LIMIT': None, 'BROKER_URL': 'amqp://', 'CELERY_EVENT_QUEUE_EXPIRES': None, 'CELERY_DEFAULT_EXCHANGE_TYPE': 'direct', 'CELERYBEAT_SCHEDULER': 'celery.beat:PersistentScheduler', 'CELERY_MAX_CACHED_RESULTS': 100, 'CELERY_RESULT_PERSISTENT': None, 'CELERYD_POOL': 'prefork', 'CELERYD_AGENT': None, 'EMAIL_HOST': 'localhost', 'CELERY_CACHE_BACKEND_OPTIONS': {}, 'BROKER_HEARTBEAT': None, 'CELERY_RESULT_ENGINE_OPTIONS': None, 'CELERY_RESULT_SERIALIZER': 'pickle', 'CELERYBEAT_SCHEDULE_FILENAME': 'celerybeat-schedule', 'CELERY_REDIRECT_STDOUTS_LEVEL': 'WARNING', 'CELERY_IMPORTS': (), 'SERVER_EMAIL': 'celery@localhost', 'CELERYD_TASK_LOG_FORMAT': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'CELERY_SECURITY_CERTIFICATE': None, 'CELERYD_LOG_COLOR': None, 'CELERY_RESULT_EXCHANGE': 'celeryresults', 'CELERY_TRACK_STARTED': False, 'CELERY_REDIS_PASSWORD': None, 'BROKER_USER': None, 'CELERY_COUCHBASE_BACKEND_SETTINGS': None, 'CELERY_RESULT_EXCHANGE_TYPE': 'direct', 'CELERY_REDIS_DB': None, 'CELERYD_TIMER_PRECISION': 1.0, 'CELERY_REDIS_PORT': None, 'BROKER_TRANSPORT': None, 'CELERYMON_LOG_FILE': None, 'CELERYD_CONCURRENCY': 0, 'CELERYD_HIJACK_ROOT_LOGGER': True, 'BROKER_VHOST': None, 'CELERY_DEFAULT_EXCHANGE': 'celery', 'CELERY_DEFAULT_ROUTING_KEY': 'celery', 'CELERY_ALWAYS_EAGER': False, 'EMAIL_TIMEOUT': 2, 'CELERYD_TASK_SOFT_TIME_LIMIT': None, 'CELERY_WORKER_DIRECT': False, 'CELERY_REDIS_HOST': None, 'CELERY_QUEUE_HA_POLICY': None, 'BROKER_PORT': None, 'CELERYD_AUTORELOADER': 'celery.worker.autoreload:Autoreloader', 'BROKER_CONNECTION_TIMEOUT': 4, 'CELERY_ENABLE_REMOTE_CONTROL': True, 'CELERY_RESULT_DB_SHORT_LIVED_SESSIONS': False, 'CELERY_EVENT_SERIALIZER': 'json', 'CASSANDRA_DETAILED_MODE': False, 'CELERY_REDIS_MAX_CONNECTIONS': None, 'CELERY_CACHE_BACKEND': None, 'CELERYD_PREFETCH_MULTIPLIER': 4, 'BROKER_PASSWORD': None, 'CELERY_BROADCAST_EXCHANGE_TYPE': 'fanout', 'CELERY_EAGER_PROPAGATES_EXCEPTIONS': False, 'CELERY_IGNORE_RESULT': False, 'CASSANDRA_KEYSPACE': None, 'EMAIL_HOST_PASSWORD': None, 'CELERYMON_LOG_LEVEL': 'INFO', 'CELERY_DISABLE_RATE_LIMITS': False, 'CELERY_TASK_PUBLISH_RETRY_POLICY': {'interval_start': 0, 'interval_max': 1, 'max_retries': 3, 'interval_step': 0.2}, 'CELERY_SECURITY_KEY': None, 'CELERY_MONGODB_BACKEND_SETTINGS': None, 'CELERY_DEFAULT_RATE_LIMIT': None, 'CELERYBEAT_SYNC_EVERY': 0, 'CELERY_EVENT_QUEUE_TTL': None, 'CELERYD_POOL_PUTLOCKS': True, 'CELERY_TASK_SERIALIZER': 'pickle', 'CELERYD_WORKER_LOST_WAIT': 10.0, 'CASSANDRA_SERVERS': None, 'CELERYD_POOL_RESTARTS': False, 'CELERY_TASK_PUBLISH_RETRY': True, 'CELERY_ENABLE_UTC': True, 'CELERY_SEND_EVENTS': False, 'BROKER_CONNECTION_MAX_RETRIES': 100, 'CELERYD_LOG_FILE': None, 'CELERYD_FORCE_EXECV': False, 'CELERY_CHORD_PROPAGATES': True, 'CELERYD_AUTOSCALER': 'celery.worker.autoscale:Autoscaler', 'CELERYD_STATE_DB': None, 'CELERY_ROUTES': None, 'CELERYD_TIMER': None, 'ADMINS': (), 'BROKER_HEARTBEAT_CHECKRATE': 3.0, 'CELERY_ACCEPT_CONTENT': ['json', 'pickle', 'msgpack', 'yaml'], 'BROKER_LOGIN_METHOD': None, 'BROKER_CONNECTION_RETRY': True, 'CELERY_TIMEZONE': None, 'CASSANDRA_WRITE_CONSISTENCY': None, 'CELERYBEAT_MAX_LOOP_INTERVAL': 0, 'CELERYD_LOG_LEVEL': 'WARN', 'CELERY_REDIRECT_STDOUTS': True, 'BROKER_POOL_LIMIT': 10, 'CELERY_SECURITY_CERT_STORE': None, 'CELERYD_CONSUMER': 'celery.worker.consumer:Consumer', 'CELERY_INCLUDE': (), 'CELERYD_MAX_TASKS_PER_CHILD': None, 'CELERYD_LOG_FORMAT': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'CELERY_ANNOTATIONS': None, 'CELERY_MESSAGE_COMPRESSION': None, 'CASSANDRA_READ_CONSISTENCY': None, 'EMAIL_USE_SSL': False, 'CELERY_SEND_TASK_ERROR_EMAILS': False, 'CELERY_QUEUES': None, 'CELERY_ACKS_LATE': False, 'CELERYMON_LOG_FORMAT': '[%(asctime)s: %(levelname)s] %(message)s', 'CELERY_TASK_RESULT_EXPIRES': datetime.timedelta(1), 'BROKER_HOST': None, 'EMAIL_PORT': 25, 'BROKER_FAILOVER_STRATEGY': None, 'CELERY_RESULT_BACKEND': 'rpc://', 'CELERY_BROADCAST_EXCHANGE': 'celeryctl', 'CELERYBEAT_LOG_FILE': None, 'CELERYBEAT_SCHEDULE': {}, 'CELERY_RESULT_DBURI': None, 'CELERY_DEFAULT_DELIVERY_MODE': 2, 'CELERYBEAT_LOG_LEVEL': 'INFO', 'CASSANDRA_COLUMN_FAMILY': None, 'EMAIL_HOST_USER': None}
最终与这个项目布局一起工作
proj/celery_proj/__init__.py /celery.py /tasks.py /test.py
哪里
Celery
from __future__ import absolute_import from celery import Celery app = Celery('celery_proj', broker='amqp://', backend='amqp://', include=['celery_proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, ) if __name__ == '__main__': app.start()
task.py
from __future__ import absolute_import from celery_proj.celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
test.py
__author__ = 'mehdi' path = '/home/mehdi/PycharmProjects' import sys sys.path.append(path) from celery_proj.tasks import add r = add.delay(4,4) print(r.status) print(r.result)
并通过以下方式启动工作人员:
cd proj celery -A celery_proj worker -l info
然后运行test.py:
python test.py