Python twisted.python.log 模块,addObserver() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用twisted.python.log.addObserver()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_errorMessageOnConnectionLostBeforeGenerationFailedDoesNotConfuse(self):
        """
        If the request passed to L{HTTP11ClientProtocol} finished generation
        with an error after the L{HTTP11ClientProtocol}'s connection has been
        lost, an error is logged that gives a non-confusing hint to user on what
        went wrong.
        """
        errors = []
        log.addObserver(errors.append)
        self.addCleanup(log.removeObserver, errors.append)

        def check(ignore):
            error = errors[0]
            self.assertEqual(error[u'why'],
                              u'Error writing request, but not in valid state '
                              u'to finalize request: CONNECTION_LOST')

        return self.test_connectionLostDuringRequestGeneration(
            'errback').addCallback(check)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        """
        Create a temporary file with a fixed payload of 64 bytes.  Create a
        resource for that file and create a request which will be for that
        resource.  Each test can set a different range header to test different
        aspects of the implementation.
        """
        path = FilePath(self.mktemp())
        # This is just a jumble of random stuff.  It's supposed to be a good
        # set of data for this test, particularly in order to avoid
        # accidentally seeing the right result by having a byte sequence
        # repeated at different locations or by having byte values which are
        # somehow correlated with their position in the string.
        self.payload = (b'\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
                        b'\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
                        b'\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
                        b'&\xfd%\xdd\x82q/A\x10Y\x8b')
        path.setContent(self.payload)
        self.file = path.open()
        self.resource = static.File(self.file.name)
        self.resource.isLeaf = 1
        self.request = DummyRequest([b''])
        self.request.uri = self.file.name
        self.catcher = []
        log.addObserver(self.catcher.append)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_malformedHeaderCGI(self):
        """
        Check for the error message in the duplicated header
        """
        cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(discardBody)
        loggedMessages = []

        def addMessage(eventDict):
            loggedMessages.append(log.textFromEventDict(eventDict))

        log.addObserver(addMessage)
        self.addCleanup(log.removeObserver, addMessage)

        def checkResponse(ignored):
            self.assertIn("ignoring malformed CGI header: 'XYZ'",
                          loggedMessages)

        d.addCallback(checkResponse)
        return d
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_logStderr(self):
        """
        When the _errFlag is set to L{StandardErrorBehavior.LOG},
        L{endpoints._WrapIProtocol} logs stderr (in childDataReceived).
        """
        d = self.ep.connect(self.factory)
        self.successResultOf(d)
        wpp = self.reactor.processProtocol
        log.addObserver(self._stdLog)
        self.addCleanup(log.removeObserver, self._stdLog)

        wpp.childDataReceived(2, b'stderr1')
        self.assertEqual(self.eventLog['executable'], wpp.executable)
        self.assertEqual(self.eventLog['data'], b'stderr1')
        self.assertEqual(self.eventLog['protocol'], wpp.protocol)
        self.assertIn(
            'wrote stderr unhandled by',
            log.textFromEventDict(self.eventLog))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_outputReceivedCompleteLine(self):
        """
        Getting a complete output line generates a log message.
        """
        events = []
        self.addCleanup(log.removeObserver, events.append)
        log.addObserver(events.append)
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # Advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)
        # Long time passes
        self.reactor.advance(self.pm.threshold)
        # Process greets
        self.pm.protocols["foo"].outReceived(b'hello world!\n')
        self.assertEquals(len(events), 1)
        message = events[0]['message']
        self.assertEquals(message, tuple([u'[foo] hello world!']))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_outputReceivedPartialLine(self):
        """
        Getting partial line results in no events until process end
        """
        events = []
        self.addCleanup(log.removeObserver, events.append)
        log.addObserver(events.append)
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # Advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)
        # Long time passes
        self.reactor.advance(self.pm.threshold)
        # Process greets
        self.pm.protocols["foo"].outReceived(b'hello world!')
        self.assertEquals(len(events), 0)
        self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
        self.assertEquals(len(events), 1)
        message = events[0]['message']
        self.assertEquals(message, tuple([u'[foo] hello world!']))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        """
        Add a log observer which records log events in C{self.out}.  Also,
        make sure the default string encoding is ASCII so that
        L{testSingleUnicode} can test the behavior of logging unencodable
        unicode messages.
        """
        self.out = FakeFile()
        self.lp = log.LogPublisher()
        self.flo = log.FileLogObserver(self.out)
        self.lp.addObserver(self.flo.emit)

        try:
            str(u'\N{VULGAR FRACTION ONE HALF}')
        except UnicodeEncodeError:
            # This is the behavior we want - don't change anything.
            self._origEncoding = None
        else:
            if _PY3:
                self._origEncoding = None
                return
            reload(sys)
            self._origEncoding = sys.getdefaultencoding()
            sys.setdefaultencoding('ascii')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_printToStderrSetsIsError(self):
        """
        startLogging()'s overridden sys.stderr should consider everything
        written to it an error.
        """
        self._startLoggingCleanup()
        fakeFile = StringIO()
        log.startLogging(fakeFile)

        def observe(event):
            observed.append(event)
        observed = []
        log.addObserver(observe)

        print("Hello, world.", file=sys.stderr)
        self.assertEqual(observed[0]["isError"], 1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_startLoggingTwice(self):
        """
        There are some obscure error conditions that can occur when logging is
        started twice. See http://twistedmatrix.com/trac/ticket/3289 for more
        information.
        """
        self._startLoggingCleanup()
        # The bug is particular to the way that the t.p.log 'global' function
        # handle stdout. If we use our own stream, the error doesn't occur. If
        # we use our own LogPublisher, the error doesn't occur.
        sys.stdout = StringIO()

        def showError(eventDict):
            if eventDict['isError']:
                sys.__stdout__.write(eventDict['failure'].getTraceback())

        log.addObserver(showError)
        self.addCleanup(log.removeObserver, showError)
        observer = log.startLogging(sys.stdout)
        self.addCleanup(observer.stop)
        # At this point, we expect that sys.stdout is a StdioOnnaStick object.
        self.assertIsInstance(sys.stdout, LoggingFile)
        fakeStdout = sys.stdout
        observer = log.startLogging(sys.stdout)
        self.assertIs(sys.stdout, fakeStdout)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        rootLogger = logging.getLogger("")
        originalLevel = rootLogger.getEffectiveLevel()
        rootLogger.setLevel(logging.DEBUG)

        @self.addCleanup
        def restoreLevel():
            rootLogger.setLevel(originalLevel)
        self.hdlr, self.out = handlerAndBytesIO()
        rootLogger.addHandler(self.hdlr)

        @self.addCleanup
        def removeLogger():
            rootLogger.removeHandler(self.hdlr)
            self.hdlr.close()

        self.lp = log.LogPublisher()
        self.obs = log.PythonLoggingObserver()
        self.lp.addObserver(self.obs.emit)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_startStopObserver(self):
        """
        Test that start and stop methods of the observer actually register
        and unregister to the log system.
        """
        oldAddObserver = log.addObserver
        oldRemoveObserver = log.removeObserver
        l = []
        try:
            log.addObserver = l.append
            log.removeObserver = l.remove
            obs = log.PythonLoggingObserver()
            obs.start()
            self.assertEqual(l[0], obs.emit)
            obs.stop()
            self.assertEqual(len(l), 0)
        finally:
            log.addObserver = oldAddObserver
            log.removeObserver = oldRemoveObserver
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_failureLogger(self):
        """
        The reason argument passed to log.err() appears in the report
        generated by DefaultObserver.
        """
        self.catcher = []
        self.observer = self.catcher.append
        log.addObserver(self.observer)
        self.addCleanup(log.removeObserver, self.observer)

        obs = log.DefaultObserver()
        obs.stderr = StringIO()
        obs.start()
        self.addCleanup(obs.stop)

        reason = "The reason."
        log.err(Exception(), reason)
        errors = self.flushLoggedErrors()

        self.assertIn(reason, obs.stderr.getvalue())
        self.assertEqual(len(errors), 1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_logErrorLogsErrorNoRepr(self):
        """
        The text logged by L{defer.logError} has no repr of the failure.
        """
        output = []

        def emit(eventDict):
            output.append(log.textFromEventDict(eventDict))

        log.addObserver(emit)

        error = failure.Failure(RuntimeError())
        defer.logError(error)
        self.flushLoggedErrors(RuntimeError)

        self.assertTrue(output[0].startswith("Unhandled Error\nTraceback "))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_quiet(self):
        """
        L{TLSMemoryBIOFactory.doStart} and L{TLSMemoryBIOFactory.doStop} do
        not log any messages.
        """
        contextFactory = ServerTLSContext()

        logs = []
        logger = logs.append
        log.addObserver(logger)
        self.addCleanup(log.removeObserver, logger)
        wrappedFactory = ServerFactory()
        # Disable logging on the wrapped factory:
        wrappedFactory.doStart = lambda: None
        wrappedFactory.doStop = lambda: None
        factory = TLSMemoryBIOFactory(contextFactory, False, wrappedFactory)
        factory.doStart()
        factory.doStop()
        self.assertEqual(logs, [])
项目:retwist    作者:trustyou    | 项目源码 | 文件源码
def test_error_handling():
    """
    Check that exceptions in json_GET result in a 500 response code.
    """

    def err_observer(event):
        # type: (dict) -> None
        assert event["isError"]
        failure = event["failure"]
        assert isinstance(failure, Failure)
        exception = failure.value
        assert isinstance(exception, TypeError)
        err_observer.called = True

    log.addObserver(err_observer)

    for resource in [SyncBrokenPage(), AsyncBrokenPage()]:
        request = MyDummyRequest([b"/"])
        err_observer.called = False
        yield _render(resource, request)

        assert request.responseCode == 500
        assert err_observer.called is True, "Error handler not called for {}".format(type(resource).__name__)

    log.removeObserver(err_observer)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.ports = []
        self.messages = []
        log.addObserver(self.messages.append)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.catcher = []
        log.addObserver(self.catcher.append)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testErroneousErrors(self):
        L1 = []
        L2 = []
        log.addObserver(lambda events: L1.append(events))
        log.addObserver(lambda events: 1/0)
        log.addObserver(lambda events: L2.append(events))
        log.msg("Howdy, y'all.")

        # XXX - use private _flushErrors so we don't also catch
        # the deprecation warnings
        excs = [f.type for f in log._flushErrors(ZeroDivisionError)]
        self.assertEquals([ZeroDivisionError], excs)

        self.assertEquals(len(L1), 2)
        self.assertEquals(len(L2), 2)

        self.assertEquals(L1[1]['message'], ("Howdy, y'all.",))
        self.assertEquals(L2[0]['message'], ("Howdy, y'all.",))

        # The observer has been removed, there should be no exception
        log.msg("Howdy, y'all.")

        self.assertEquals(len(L1), 3)
        self.assertEquals(len(L2), 3)
        self.assertEquals(L1[2]['message'], ("Howdy, y'all.",))
        self.assertEquals(L2[2]['message'], ("Howdy, y'all.",))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        # Fuck you Python.
        reload(sys)
        self._origEncoding = sys.getdefaultencoding()
        sys.setdefaultencoding('ascii')

        self.out = FakeFile()
        self.lp = log.LogPublisher()
        self.flo = log.FileLogObserver(self.out)
        self.lp.addObserver(self.flo.emit)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.c = []
        log.addObserver(self.c.append)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _setUpLogWarnings(self):
        if self._logWarnings:
            return
        def seeWarnings(x):
           if x.has_key('warning'):
               print
               print x['format'] % x
        log.addObserver(seeWarnings)
        self._logWarnings = True
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.ports = []
        self.messages = []
        log.addObserver(self.messages.append)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.catcher = []
        log.addObserver(self.catcher.append)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testErroneousErrors(self):
        L1 = []
        L2 = []
        log.addObserver(lambda events: L1.append(events))
        log.addObserver(lambda events: 1/0)
        log.addObserver(lambda events: L2.append(events))
        log.msg("Howdy, y'all.")

        # XXX - use private _flushErrors so we don't also catch
        # the deprecation warnings
        excs = [f.type for f in log._flushErrors(ZeroDivisionError)]
        self.assertEquals([ZeroDivisionError], excs)

        self.assertEquals(len(L1), 2)
        self.assertEquals(len(L2), 2)

        self.assertEquals(L1[1]['message'], ("Howdy, y'all.",))
        self.assertEquals(L2[0]['message'], ("Howdy, y'all.",))

        # The observer has been removed, there should be no exception
        log.msg("Howdy, y'all.")

        self.assertEquals(len(L1), 3)
        self.assertEquals(len(L2), 3)
        self.assertEquals(L1[2]['message'], ("Howdy, y'all.",))
        self.assertEquals(L2[2]['message'], ("Howdy, y'all.",))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        # Fuck you Python.
        reload(sys)
        self._origEncoding = sys.getdefaultencoding()
        sys.setdefaultencoding('ascii')

        self.out = FakeFile()
        self.lp = log.LogPublisher()
        self.flo = log.FileLogObserver(self.out)
        self.lp.addObserver(self.flo.emit)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def setUp(self):
        self.c = []
        log.addObserver(self.c.append)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _setUpLogWarnings(self):
        if self._logWarnings:
            return
        def seeWarnings(x):
           if x.has_key('warning'):
               print
               print x['format'] % x
        log.addObserver(seeWarnings)
        self._logWarnings = True
项目:ccs-twistedextensions    作者:apple    | 项目源码 | 文件源码
def addFilteredObserver(cls, observer):
        log.addObserver(FilteringLogObserver(
            observer,
            [cls.filterPredicate]
        ))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _getLogObserver(self):
        """
        Create and return a suitable log observer for the given configuration.

        The observer will go to syslog using the prefix C{_syslogPrefix} if
        C{_syslog} is true.  Otherwise, it will go to the file named
        C{_logfilename} or, if C{_nodaemon} is true and C{_logfilename} is
        C{"-"}, to stdout.

        @return: An object suitable to be passed to C{log.addObserver}.
        """
        if self._syslog:
            # FIXME: Requires twisted.python.syslog to be ported to Py3
            # https://twistedmatrix.com/trac/ticket/7957
            from twisted.python import syslog
            return syslog.SyslogObserver(self._syslogPrefix).emit

        if self._logfilename == '-':
            if not self._nodaemon:
                sys.exit('Daemons cannot log to stdout, exiting!')
            logFile = sys.stdout
        elif self._nodaemon and not self._logfilename:
            logFile = sys.stdout
        else:
            if not self._logfilename:
                self._logfilename = 'twistd.log'
            logFile = logfile.LogFile.fromFullPath(self._logfilename)
            try:
                import signal
            except ImportError:
                pass
            else:
                # Override if signal is set to None or SIG_DFL (0)
                if not signal.getsignal(signal.SIGUSR1):
                    def rotateLog(signal, frame):
                        from twisted.internet import reactor
                        reactor.callFromThread(logFile.rotate)
                    signal.signal(signal.SIGUSR1, rotateLog)
        return logger.textFileLogObserver(logFile)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_ignored1XXResponseCausesLog(self):
        """
        When a 1XX response is ignored, Twisted emits a log.
        """
        sample103Response = (
            b'HTTP/1.1 103 Early Hints\r\n'
            b'Server: socketserver/1.0.0\r\n'
            b'Link: </other/styles.css>; rel=preload; as=style\r\n'
            b'Link: </other/action.js>; rel=preload; as=script\r\n'
            b'\r\n'
        )

        # Catch the logs.
        logs = []
        log.addObserver(logs.append)
        self.addCleanup(log.removeObserver, logs.append)

        protocol = HTTPClientParser(
            Request(b'GET', b'/', _boringHeaders, None),
            lambda ign: None
        )
        protocol.makeConnection(StringTransport())
        protocol.dataReceived(sample103Response)

        self.assertEqual(
            logs[0]['message'][0], 'Ignoring unexpected 103 response'
        )
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def observe(self):
        loggedMessages = []
        log.addObserver(loggedMessages.append)
        self.addCleanup(log.removeObserver, loggedMessages.append)
        return loggedMessages
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def stopOnError(case, reactor, publisher=None):
    """
    Stop the reactor as soon as any error is logged on the given publisher.

    This is beneficial for tests which will wait for a L{Deferred} to fire
    before completing (by passing or failing).  Certain implementation bugs may
    prevent the L{Deferred} from firing with any result at all (consider a
    protocol's {dataReceived} method that raises an exception: this exception
    is logged but it won't ever cause a L{Deferred} to fire).  In that case the
    test would have to complete by timing out which is a much less desirable
    outcome than completing as soon as the unexpected error is encountered.

    @param case: A L{SynchronousTestCase} to use to clean up the necessary log
        observer when the test is over.
    @param reactor: The reactor to stop.
    @param publisher: A L{LogPublisher} to watch for errors.  If L{None}, the
        global log publisher will be watched.
    """
    if publisher is None:
        from twisted.python import log as publisher
    running = [None]
    def stopIfError(event):
        if running and event.get('isError'):
            running.pop()
            reactor.stop()
    publisher.addObserver(stopIfError)
    case.addCleanup(publisher.removeObserver, stopIfError)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_stderrSkip(self):
        """
        When the _errFlag is set to L{StandardErrorBehavior.DROP},
        L{endpoints._WrapIProtocol} ignores stderr.
        """
        self.ep._errFlag = StandardErrorBehavior.DROP
        d = self.ep.connect(self.factory)
        self.successResultOf(d)
        wpp = self.reactor.processProtocol
        log.addObserver(self._stdLog)
        self.addCleanup(log.removeObserver, self._stdLog)

        wpp.childDataReceived(2, b'stderr2')
        self.assertIsNone(self.eventLog)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        self.ports = []
        self.messages = []
        log.addObserver(self.messages.append)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        self.catcher = []
        self.observer = self.catcher.append
        log.addObserver(self.observer)
        self.addCleanup(log.removeObserver, self.observer)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_erroneousErrors(self):
        """
        Exceptions raised by log observers are logged but the observer which
        raised the exception remains registered with the publisher.  These
        exceptions do not prevent the event from being sent to other observers
        registered with the publisher.
        """
        L1 = []
        L2 = []

        def broken(event):
            1 // 0

        for observer in [L1.append, broken, L2.append]:
            log.addObserver(observer)
            self.addCleanup(log.removeObserver, observer)

        for i in range(3):
            # Reset the lists for simpler comparison.
            L1[:] = []
            L2[:] = []

            # Send out the event which will break one of the observers.
            log.msg("Howdy, y'all.", log_trace=[])

            # The broken observer should have caused this to be logged.
            excs = self.flushLoggedErrors(ZeroDivisionError)
            del self.catcher[:]
            self.assertEqual(len(excs), 1)

            # Both other observers should have seen the message.
            self.assertEqual(len(L1), 2)
            self.assertEqual(len(L2), 2)

            # The first event is delivered to all observers; then, errors
            # are delivered.
            self.assertEqual(L1[0]['message'], ("Howdy, y'all.",))
            self.assertEqual(L2[0]['message'], ("Howdy, y'all.",))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_showwarning(self):
        """
        L{twisted.python.log.showwarning} emits the warning as a message
        to the Twisted logging system.
        """
        publisher = log.LogPublisher()
        publisher.addObserver(self.observer)

        publisher.showwarning(
            FakeWarning("unique warning message"), FakeWarning,
            "warning-filename.py", 27)
        event = self.catcher.pop()
        self.assertEqual(
            event['format'] % event,
            'warning-filename.py:27: twisted.test.test_log.FakeWarning: '
            'unique warning message')
        self.assertEqual(self.catcher, [])

        # Python 2.6 requires that any function used to override the
        # warnings.showwarning API accept a "line" parameter or a
        # deprecation warning is emitted.
        publisher.showwarning(
            FakeWarning("unique warning message"), FakeWarning,
            "warning-filename.py", 27, line=object())
        event = self.catcher.pop()
        self.assertEqual(
            event['format'] % event,
            'warning-filename.py:27: twisted.test.test_log.FakeWarning: '
            'unique warning message')
        self.assertEqual(self.catcher, [])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_emitPrefix(self):
        """
        FileLogObserver.emit() will add a timestamp and system prefix to its
        file output.
        """
        output = StringIO()
        flo = log.FileLogObserver(output)
        events = []

        def observer(event):
            # Capture the event for reference and pass it along to flo
            events.append(event)
            flo.emit(event)

        publisher = log.LogPublisher()
        publisher.addObserver(observer)

        publisher.msg("Hello!")
        self.assertEqual(len(events), 1)
        event = events[0]

        result = output.getvalue()
        prefix = "{time} [{system}] ".format(
            time=flo.formatTime(event["time"]), system=event["system"],
        )

        self.assertTrue(
            result.startswith(prefix),
            "{0!r} does not start with {1!r}".format(result, prefix)
        )
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setUp(self):
        self.resultLogs = []
        log.addObserver(self.resultLogs.append)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _add(self):
        if self._added == 0:
            log.addObserver(self.gotEvent)
        self._added += 1
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def assertLogMessage(testCase, expectedMessages, callable, *args, **kwargs):
    """
    Assert that the callable logs the expected messages when called.

    XXX: Put this somewhere where it can be re-used elsewhere. See #6677.

    @param testCase: The test case controlling the test which triggers the
        logged messages and on which assertions will be called.
    @type testCase: L{unittest.SynchronousTestCase}

    @param expectedMessages: A L{list} of the expected log messages
    @type expectedMessages: L{list}

    @param callable: The function which is expected to produce the
        C{expectedMessages} when called.
    @type callable: L{callable}

    @param args: Positional arguments to be passed to C{callable}.
    @type args: L{list}

    @param kwargs: Keyword arguments to be passed to C{callable}.
    @type kwargs: L{dict}
    """
    loggedMessages = []
    log.addObserver(loggedMessages.append)
    testCase.addCleanup(log.removeObserver, loggedMessages.append)

    callable(*args, **kwargs)

    testCase.assertEqual(
        [m['message'][0] for m in loggedMessages],
        expectedMessages)
项目:retwist    作者:trustyou    | 项目源码 | 文件源码
def enable_sentry_reporting(client):
    # type: (Client) -> None
    """
    Enable Sentry logging for any errors reported via twisted.python.log.err.
    :param client: Already configured raven.Client
    """
    global raven_client
    raven_client = client
    log.addObserver(log_to_sentry)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_startLoggingOverridesWarning(self):
        """
        startLogging() overrides global C{warnings.showwarning} such that
        warnings go to Twisted log observers.
        """
        self._startLoggingCleanup()
        newPublisher = NewLogPublisher()

        class SysModule(object):
            stdout = object()
            stderr = object()

        tempLogPublisher = LogPublisher(
            newPublisher, newPublisher,
            logBeginner=LogBeginner(newPublisher, StringIO(), SysModule,
                                    warnings)
        )
        # Trial reports warnings in two ways.  First, it intercepts the global
        # 'showwarning' function *itself*, after starting logging (by way of
        # the '_collectWarnings' function which collects all warnings as a
        # around the test's 'run' method).  Second, it has a log observer which
        # immediately reports warnings when they're propagated into the log
        # system (which, in normal operation, happens only at the end of the
        # test case).  In order to avoid printing a spurious warning in this
        # test, we first replace the global log publisher's 'showwarning' in
        # the module with our own.
        self.patch(log, "theLogPublisher", tempLogPublisher)
        # And, one last thing, pretend we're starting from a fresh import, or
        # warnings.warn won't be patched at all.
        log._oldshowwarning = None
        # Global mutable state is bad, kids.  Stay in school.
        fakeFile = StringIO()
        # We didn't previously save log messages, so let's make sure we don't
        # save them any more.
        evt = {"pre-start": "event"}
        received = []

        def preStartObserver(x):
            if 'pre-start' in x.keys():
                received.append(x)

        newPublisher(evt)
        newPublisher.addObserver(preStartObserver)
        log.startLogging(fakeFile, setStdout=False)
        self.addCleanup(tempLogPublisher._stopLogging)
        self.assertEqual(received, [])
        warnings.warn("hello!")
        output = fakeFile.getvalue()
        self.assertIn("UserWarning: hello!", output)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def resumeProducingRaises(self, consumer, expectedExceptions):
        """
        Common implementation for tests where the underlying producer throws
        an exception when its resumeProducing is called.
        """
        class ThrowingProducer(NonStreamingProducer):

            def resumeProducing(self):
                if self.counter == 2:
                    return 1/0
                else:
                    NonStreamingProducer.resumeProducing(self)

        nsProducer = ThrowingProducer(consumer)
        streamingProducer = _PullToPush(nsProducer, consumer)
        consumer.registerProducer(streamingProducer, True)

        # Register log observer:
        loggedMsgs = []
        log.addObserver(loggedMsgs.append)
        self.addCleanup(log.removeObserver, loggedMsgs.append)

        # Make consumer unregister do what TLSMemoryBIOProtocol would do:
        def unregister(orig=consumer.unregisterProducer):
            orig()
            streamingProducer.stopStreaming()
        consumer.unregisterProducer = unregister

        # Start streaming:
        streamingProducer.startStreaming()

        done = streamingProducer._coopTask.whenDone()
        done.addErrback(lambda reason: reason.trap(TaskStopped))
        def stopped(ign):
            self.assertEqual(consumer.value(), b"01")
            # Any errors from resumeProducing were logged:
            errors = self.flushLoggedErrors()
            self.assertEqual(len(errors), len(expectedExceptions))
            for f, (expected, msg), logMsg in zip(
                errors, expectedExceptions, loggedMsgs):
                self.assertTrue(f.check(expected))
                self.assertIn(msg, logMsg['why'])
            # And the streaming wrapper stopped:
            self.assertTrue(streamingProducer._finished)
        done.addCallback(stopped)
        return done
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def start(self, logfile=None, application_name="ooniprobe"):
        from ooni.settings import config

        if not logfile:
            logfile = os.path.expanduser(config.basic.logfile)

        log_folder = os.path.dirname(logfile)
        if (not os.access(log_folder, os.W_OK) or
            (os.path.exists(logfile) and not os.access(logfile, os.W_OK))):
            # If we don't have permissions to write to the log_folder or
            # logfile.
            log_folder = config.running_path
            logfile = os.path.join(log_folder, "ooniprobe.log")

        self.log_filepath = logfile

        mkdir_p(log_folder)

        log_filename = os.path.basename(logfile)
        file_log_level = levels.get(config.basic.loglevel,
                                    levels['INFO'])
        stdout_log_level = levels['INFO']
        if config.advanced.debug:
            stdout_log_level = levels['DEBUG']

        if config.basic.rotate == 'daily':
            logfile = MyDailyLogFile(log_filename, log_folder)
        elif config.basic.rotate == 'length':
            logfile = LogFile(log_filename, log_folder,
                              rotateLength=int(human_size_to_bytes(
                                  config.basic.rotate_length
                              )),
                              maxRotatedFiles=config.basic.max_rotated_files)
        else:
            logfile = open(os.path.join(log_folder, log_filename), 'a')

        self.fileObserver = MsecLogObserver(logfile, log_level=file_log_level)
        self.stdoutObserver = StdoutStderrObserver(sys.stdout,
                                                   log_level=stdout_log_level)

        tw_log.startLoggingWithObserver(self.fileObserver.emit)
        tw_log.addObserver(self.stdoutObserver.emit)

        tw_log.msg("Starting %s on %s (%s UTC)" % (application_name,
                                                   otime.prettyDateNow(),
                                                   otime.prettyDateNowUTC()))