Python blinker 模块,Namespace() 实例源码

我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用blinker.Namespace()

项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def __init__(self, url, watchdog=None, loop=None):
        self._resp_queue = asyncio.Queue(loop=loop)
        self.loop = loop
        assert url.startswith('ws://') or url.startswith('wss://')
        self.url = url
        self._websocket = None
        self._ws_recv_handler_task = None
        self._watchdog_task = None
        self.watchdog_time = watchdog
        self._signal_ns = blinker.Namespace()
项目:invenio-stats    作者:inveniosoftware    | 项目源码 | 文件源码
def test_register_receivers(base_app, event_entrypoints):
    """Test signal-receiving/event-emitting functions registration."""
    try:
        _signals = Namespace()
        my_signal = _signals.signal('my-signal')

        def event_builder1(event, sender_app, signal_param, *args, **kwargs):
            event.update(dict(event_param1=signal_param))
            return event

        def event_builder2(event, sender_app, signal_param, *args, **kwargs):
            event.update(dict(event_param2=event['event_param1'] + 1))
            return event

        base_app.config.update(dict(
            STATS_EVENTS=dict(
                event_0=dict(
                    signal=my_signal,
                    event_builders=[event_builder1, event_builder2]
                )
            )
        ))
        InvenioStats(base_app)
        current_queues.declare()
        my_signal.send(base_app, signal_param=42)
        my_signal.send(base_app, signal_param=42)
        events = [event for event in current_stats.consume('event_0')]
        # two events should have been created from the sent events. They should
        # have been both processed by the two event builders.
        assert events == [{'event_param1': 42, 'event_param2': 43}] * 2
    finally:
        current_queues.delete()