我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用blinker.Namespace()。
def __init__(self, url, watchdog=None, loop=None): self._resp_queue = asyncio.Queue(loop=loop) self.loop = loop assert url.startswith('ws://') or url.startswith('wss://') self.url = url self._websocket = None self._ws_recv_handler_task = None self._watchdog_task = None self.watchdog_time = watchdog self._signal_ns = blinker.Namespace()
def test_register_receivers(base_app, event_entrypoints): """Test signal-receiving/event-emitting functions registration.""" try: _signals = Namespace() my_signal = _signals.signal('my-signal') def event_builder1(event, sender_app, signal_param, *args, **kwargs): event.update(dict(event_param1=signal_param)) return event def event_builder2(event, sender_app, signal_param, *args, **kwargs): event.update(dict(event_param2=event['event_param1'] + 1)) return event base_app.config.update(dict( STATS_EVENTS=dict( event_0=dict( signal=my_signal, event_builders=[event_builder1, event_builder2] ) ) )) InvenioStats(base_app) current_queues.declare() my_signal.send(base_app, signal_param=42) my_signal.send(base_app, signal_param=42) events = [event for event in current_stats.consume('event_0')] # two events should have been created from the sent events. They should # have been both processed by the two event builders. assert events == [{'event_param1': 42, 'event_param2': 43}] * 2 finally: current_queues.delete()