Python twisted.internet.error 模块,ReactorNotRunning() 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用twisted.internet.error.ReactorNotRunning()

项目:privcount    作者:privcount    | 项目源码 | 文件源码
def stop_reactor(exit_code=0):
    '''
    Stop the reactor and exit with exit_code.
    If exit_code is None, don't exit, just return to the caller.
    exit_code must be between 1 and 255.
    '''
    if exit_code is not None:
        logging.warning("Exiting with code {}".format(exit_code))
    else:
        # Let's hope the calling code exits pretty soon after this
        logging.warning("Stopping reactor")

    try:
        reactor.stop()
    except ReactorNotRunning:
        pass

    # return to the caller and let it decide what to do
    if exit_code == None:
        return

    # a graceful exit
    if exit_code == 0:
        sys.exit()

    # a hard exit
    assert exit_code >= 0
    assert exit_code <= 127
    os._exit(exit_code)
项目:congredi    作者:toxik-io    | 项目源码 | 文件源码
def test_peerBeat(self):
        self.threshold = .1
        print('IMPLEMENT tests/test_peerBeat')
        queryBackground()
        updateRead()
        updateWrite()
        peerSuccess()
        fails = mock_traceback(b'wow')
        try:
            peerFailure(fails)
        except ReactorNotRunning:
            print('good')
        peerBeat()
项目:ccs-twistedextensions    作者:apple    | 项目源码 | 文件源码
def goodbye(reason):
        """
        Stop the process if stdin is closed.
        """
        try:
            reactor.stop()
        except ReactorNotRunning:
            pass
        return origLost(reason)
项目:checo    作者:kc1212    | 项目源码 | 文件源码
def stop_reactor():
    try:
        reactor.stop()
        logging.info("STOPPING REACTOR")
    except error.ReactorNotRunning:
        pass
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def stop(self):
        """
        See twisted.internet.interfaces.IReactorCore.stop.
        """
        if self._stopped:
            raise error.ReactorNotRunning(
                "Can't stop reactor that isn't running.")
        self._stopped = True
        self._justStopped = True
        self._startedBefore = True
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def stop(self):
        """
        Stop the reactor.
        """
        if not self._running:
            raise error.ReactorNotRunning()
        self._running = False