小编典典

Celery:有没有办法编写自定义JSON编码器/解码器?

json

我有一些要发送到我的应用程序中的芹菜任务的对象。这些对象显然无法使用默认json库进行json序列化。有没有一种方法可以使celery使用自定义JSON
Encoder/ 对这些对象进行序列化/反序列化Decoder


阅读 292

收藏
2020-07-27

共1个答案

小编典典

这里有些晚,但是您应该能够通过在kombu序列化程序注册表中注册自定义编码器和解码器,如docs中所示:http
[//docs.celeryproject.org/en/latest/userguide/calling.html

serializers](http://docs.celeryproject.org/en/latest/userguide/calling.html#serializers)。

例如,以下是Django
的自定义日期时间序列化器/反序列化器(子类化python的内置json模块):

myjson.py (将其放入您的settings.py文件的同一文件夹中)

import json
from datetime import datetime
from time import mktime

class MyEncoder(json.JSONEncoder):   
    def default(self, obj):
        if isinstance(obj, datetime):
            return {
                '__type__': '__datetime__', 
                'epoch': int(mktime(obj.timetuple()))
            }
        else:
            return json.JSONEncoder.default(self, obj)

def my_decoder(obj):
    if '__type__' in obj:
        if obj['__type__'] == '__datetime__':
            return datetime.fromtimestamp(obj['epoch'])
    return obj

# Encoder function      
def my_dumps(obj):
    return json.dumps(obj, cls=MyEncoder)

# Decoder function
def my_loads(obj):
    return json.loads(obj, object_hook=my_decoder)

settings.py

# Register your new serializer methods into kombu
from kombu.serialization import register
from .myjson import my_dumps, my_loads

register('myjson', my_dumps, my_loads, 
    content_type='application/x-myjson',
    content_encoding='utf-8')

# Tell celery to use your new serializer:
CELERY_ACCEPT_CONTENT = ['myjson']
CELERY_TASK_SERIALIZER = 'myjson'
CELERY_RESULT_SERIALIZER = 'myjson'
2020-07-27