我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用twisted.python.log.addObserver()。
def test_errorMessageOnConnectionLostBeforeGenerationFailedDoesNotConfuse(self): """ If the request passed to L{HTTP11ClientProtocol} finished generation with an error after the L{HTTP11ClientProtocol}'s connection has been lost, an error is logged that gives a non-confusing hint to user on what went wrong. """ errors = [] log.addObserver(errors.append) self.addCleanup(log.removeObserver, errors.append) def check(ignore): error = errors[0] self.assertEqual(error[u'why'], u'Error writing request, but not in valid state ' u'to finalize request: CONNECTION_LOST') return self.test_connectionLostDuringRequestGeneration( 'errback').addCallback(check)
def setUp(self): """ Create a temporary file with a fixed payload of 64 bytes. Create a resource for that file and create a request which will be for that resource. Each test can set a different range header to test different aspects of the implementation. """ path = FilePath(self.mktemp()) # This is just a jumble of random stuff. It's supposed to be a good # set of data for this test, particularly in order to avoid # accidentally seeing the right result by having a byte sequence # repeated at different locations or by having byte values which are # somehow correlated with their position in the string. self.payload = (b'\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7' b'\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0' b'\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d' b'&\xfd%\xdd\x82q/A\x10Y\x8b') path.setContent(self.payload) self.file = path.open() self.resource = static.File(self.file.name) self.resource.isLeaf = 1 self.request = DummyRequest([b'']) self.request.uri = self.file.name self.catcher = [] log.addObserver(self.catcher.append)
def test_malformedHeaderCGI(self): """ Check for the error message in the duplicated header """ cgiFilename = self.writeCGI(BROKEN_HEADER_CGI) portnum = self.startServer(cgiFilename) url = "http://localhost:%d/cgi" % (portnum,) agent = client.Agent(reactor) d = agent.request(b"GET", url) d.addCallback(discardBody) loggedMessages = [] def addMessage(eventDict): loggedMessages.append(log.textFromEventDict(eventDict)) log.addObserver(addMessage) self.addCleanup(log.removeObserver, addMessage) def checkResponse(ignored): self.assertIn("ignoring malformed CGI header: 'XYZ'", loggedMessages) d.addCallback(checkResponse) return d
def test_logStderr(self): """ When the _errFlag is set to L{StandardErrorBehavior.LOG}, L{endpoints._WrapIProtocol} logs stderr (in childDataReceived). """ d = self.ep.connect(self.factory) self.successResultOf(d) wpp = self.reactor.processProtocol log.addObserver(self._stdLog) self.addCleanup(log.removeObserver, self._stdLog) wpp.childDataReceived(2, b'stderr1') self.assertEqual(self.eventLog['executable'], wpp.executable) self.assertEqual(self.eventLog['data'], b'stderr1') self.assertEqual(self.eventLog['protocol'], wpp.protocol) self.assertIn( 'wrote stderr unhandled by', log.textFromEventDict(self.eventLog))
def test_outputReceivedCompleteLine(self): """ Getting a complete output line generates a log message. """ events = [] self.addCleanup(log.removeObserver, events.append) log.addObserver(events.append) self.pm.addProcess("foo", ["foo"]) # Schedule the process to start self.pm.startService() # Advance the reactor to start the process self.reactor.advance(0) self.assertIn("foo", self.pm.protocols) # Long time passes self.reactor.advance(self.pm.threshold) # Process greets self.pm.protocols["foo"].outReceived(b'hello world!\n') self.assertEquals(len(events), 1) message = events[0]['message'] self.assertEquals(message, tuple([u'[foo] hello world!']))
def test_outputReceivedPartialLine(self): """ Getting partial line results in no events until process end """ events = [] self.addCleanup(log.removeObserver, events.append) log.addObserver(events.append) self.pm.addProcess("foo", ["foo"]) # Schedule the process to start self.pm.startService() # Advance the reactor to start the process self.reactor.advance(0) self.assertIn("foo", self.pm.protocols) # Long time passes self.reactor.advance(self.pm.threshold) # Process greets self.pm.protocols["foo"].outReceived(b'hello world!') self.assertEquals(len(events), 0) self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0))) self.assertEquals(len(events), 1) message = events[0]['message'] self.assertEquals(message, tuple([u'[foo] hello world!']))
def setUp(self): """ Add a log observer which records log events in C{self.out}. Also, make sure the default string encoding is ASCII so that L{testSingleUnicode} can test the behavior of logging unencodable unicode messages. """ self.out = FakeFile() self.lp = log.LogPublisher() self.flo = log.FileLogObserver(self.out) self.lp.addObserver(self.flo.emit) try: str(u'\N{VULGAR FRACTION ONE HALF}') except UnicodeEncodeError: # This is the behavior we want - don't change anything. self._origEncoding = None else: if _PY3: self._origEncoding = None return reload(sys) self._origEncoding = sys.getdefaultencoding() sys.setdefaultencoding('ascii')
def test_printToStderrSetsIsError(self): """ startLogging()'s overridden sys.stderr should consider everything written to it an error. """ self._startLoggingCleanup() fakeFile = StringIO() log.startLogging(fakeFile) def observe(event): observed.append(event) observed = [] log.addObserver(observe) print("Hello, world.", file=sys.stderr) self.assertEqual(observed[0]["isError"], 1)
def test_startLoggingTwice(self): """ There are some obscure error conditions that can occur when logging is started twice. See http://twistedmatrix.com/trac/ticket/3289 for more information. """ self._startLoggingCleanup() # The bug is particular to the way that the t.p.log 'global' function # handle stdout. If we use our own stream, the error doesn't occur. If # we use our own LogPublisher, the error doesn't occur. sys.stdout = StringIO() def showError(eventDict): if eventDict['isError']: sys.__stdout__.write(eventDict['failure'].getTraceback()) log.addObserver(showError) self.addCleanup(log.removeObserver, showError) observer = log.startLogging(sys.stdout) self.addCleanup(observer.stop) # At this point, we expect that sys.stdout is a StdioOnnaStick object. self.assertIsInstance(sys.stdout, LoggingFile) fakeStdout = sys.stdout observer = log.startLogging(sys.stdout) self.assertIs(sys.stdout, fakeStdout)
def setUp(self): rootLogger = logging.getLogger("") originalLevel = rootLogger.getEffectiveLevel() rootLogger.setLevel(logging.DEBUG) @self.addCleanup def restoreLevel(): rootLogger.setLevel(originalLevel) self.hdlr, self.out = handlerAndBytesIO() rootLogger.addHandler(self.hdlr) @self.addCleanup def removeLogger(): rootLogger.removeHandler(self.hdlr) self.hdlr.close() self.lp = log.LogPublisher() self.obs = log.PythonLoggingObserver() self.lp.addObserver(self.obs.emit)
def test_startStopObserver(self): """ Test that start and stop methods of the observer actually register and unregister to the log system. """ oldAddObserver = log.addObserver oldRemoveObserver = log.removeObserver l = [] try: log.addObserver = l.append log.removeObserver = l.remove obs = log.PythonLoggingObserver() obs.start() self.assertEqual(l[0], obs.emit) obs.stop() self.assertEqual(len(l), 0) finally: log.addObserver = oldAddObserver log.removeObserver = oldRemoveObserver
def test_failureLogger(self): """ The reason argument passed to log.err() appears in the report generated by DefaultObserver. """ self.catcher = [] self.observer = self.catcher.append log.addObserver(self.observer) self.addCleanup(log.removeObserver, self.observer) obs = log.DefaultObserver() obs.stderr = StringIO() obs.start() self.addCleanup(obs.stop) reason = "The reason." log.err(Exception(), reason) errors = self.flushLoggedErrors() self.assertIn(reason, obs.stderr.getvalue()) self.assertEqual(len(errors), 1)
def test_logErrorLogsErrorNoRepr(self): """ The text logged by L{defer.logError} has no repr of the failure. """ output = [] def emit(eventDict): output.append(log.textFromEventDict(eventDict)) log.addObserver(emit) error = failure.Failure(RuntimeError()) defer.logError(error) self.flushLoggedErrors(RuntimeError) self.assertTrue(output[0].startswith("Unhandled Error\nTraceback "))
def test_quiet(self): """ L{TLSMemoryBIOFactory.doStart} and L{TLSMemoryBIOFactory.doStop} do not log any messages. """ contextFactory = ServerTLSContext() logs = [] logger = logs.append log.addObserver(logger) self.addCleanup(log.removeObserver, logger) wrappedFactory = ServerFactory() # Disable logging on the wrapped factory: wrappedFactory.doStart = lambda: None wrappedFactory.doStop = lambda: None factory = TLSMemoryBIOFactory(contextFactory, False, wrappedFactory) factory.doStart() factory.doStop() self.assertEqual(logs, [])
def test_error_handling(): """ Check that exceptions in json_GET result in a 500 response code. """ def err_observer(event): # type: (dict) -> None assert event["isError"] failure = event["failure"] assert isinstance(failure, Failure) exception = failure.value assert isinstance(exception, TypeError) err_observer.called = True log.addObserver(err_observer) for resource in [SyncBrokenPage(), AsyncBrokenPage()]: request = MyDummyRequest([b"/"]) err_observer.called = False yield _render(resource, request) assert request.responseCode == 500 assert err_observer.called is True, "Error handler not called for {}".format(type(resource).__name__) log.removeObserver(err_observer)
def setUp(self): self.ports = [] self.messages = [] log.addObserver(self.messages.append)
def setUp(self): self.catcher = [] log.addObserver(self.catcher.append)
def testErroneousErrors(self): L1 = [] L2 = [] log.addObserver(lambda events: L1.append(events)) log.addObserver(lambda events: 1/0) log.addObserver(lambda events: L2.append(events)) log.msg("Howdy, y'all.") # XXX - use private _flushErrors so we don't also catch # the deprecation warnings excs = [f.type for f in log._flushErrors(ZeroDivisionError)] self.assertEquals([ZeroDivisionError], excs) self.assertEquals(len(L1), 2) self.assertEquals(len(L2), 2) self.assertEquals(L1[1]['message'], ("Howdy, y'all.",)) self.assertEquals(L2[0]['message'], ("Howdy, y'all.",)) # The observer has been removed, there should be no exception log.msg("Howdy, y'all.") self.assertEquals(len(L1), 3) self.assertEquals(len(L2), 3) self.assertEquals(L1[2]['message'], ("Howdy, y'all.",)) self.assertEquals(L2[2]['message'], ("Howdy, y'all.",))
def setUp(self): # Fuck you Python. reload(sys) self._origEncoding = sys.getdefaultencoding() sys.setdefaultencoding('ascii') self.out = FakeFile() self.lp = log.LogPublisher() self.flo = log.FileLogObserver(self.out) self.lp.addObserver(self.flo.emit)
def setUp(self): self.c = [] log.addObserver(self.c.append)
def _setUpLogWarnings(self): if self._logWarnings: return def seeWarnings(x): if x.has_key('warning'): print print x['format'] % x log.addObserver(seeWarnings) self._logWarnings = True
def addFilteredObserver(cls, observer): log.addObserver(FilteringLogObserver( observer, [cls.filterPredicate] ))
def _getLogObserver(self): """ Create and return a suitable log observer for the given configuration. The observer will go to syslog using the prefix C{_syslogPrefix} if C{_syslog} is true. Otherwise, it will go to the file named C{_logfilename} or, if C{_nodaemon} is true and C{_logfilename} is C{"-"}, to stdout. @return: An object suitable to be passed to C{log.addObserver}. """ if self._syslog: # FIXME: Requires twisted.python.syslog to be ported to Py3 # https://twistedmatrix.com/trac/ticket/7957 from twisted.python import syslog return syslog.SyslogObserver(self._syslogPrefix).emit if self._logfilename == '-': if not self._nodaemon: sys.exit('Daemons cannot log to stdout, exiting!') logFile = sys.stdout elif self._nodaemon and not self._logfilename: logFile = sys.stdout else: if not self._logfilename: self._logfilename = 'twistd.log' logFile = logfile.LogFile.fromFullPath(self._logfilename) try: import signal except ImportError: pass else: # Override if signal is set to None or SIG_DFL (0) if not signal.getsignal(signal.SIGUSR1): def rotateLog(signal, frame): from twisted.internet import reactor reactor.callFromThread(logFile.rotate) signal.signal(signal.SIGUSR1, rotateLog) return logger.textFileLogObserver(logFile)
def test_ignored1XXResponseCausesLog(self): """ When a 1XX response is ignored, Twisted emits a log. """ sample103Response = ( b'HTTP/1.1 103 Early Hints\r\n' b'Server: socketserver/1.0.0\r\n' b'Link: </other/styles.css>; rel=preload; as=style\r\n' b'Link: </other/action.js>; rel=preload; as=script\r\n' b'\r\n' ) # Catch the logs. logs = [] log.addObserver(logs.append) self.addCleanup(log.removeObserver, logs.append) protocol = HTTPClientParser( Request(b'GET', b'/', _boringHeaders, None), lambda ign: None ) protocol.makeConnection(StringTransport()) protocol.dataReceived(sample103Response) self.assertEqual( logs[0]['message'][0], 'Ignoring unexpected 103 response' )
def observe(self): loggedMessages = [] log.addObserver(loggedMessages.append) self.addCleanup(log.removeObserver, loggedMessages.append) return loggedMessages
def stopOnError(case, reactor, publisher=None): """ Stop the reactor as soon as any error is logged on the given publisher. This is beneficial for tests which will wait for a L{Deferred} to fire before completing (by passing or failing). Certain implementation bugs may prevent the L{Deferred} from firing with any result at all (consider a protocol's {dataReceived} method that raises an exception: this exception is logged but it won't ever cause a L{Deferred} to fire). In that case the test would have to complete by timing out which is a much less desirable outcome than completing as soon as the unexpected error is encountered. @param case: A L{SynchronousTestCase} to use to clean up the necessary log observer when the test is over. @param reactor: The reactor to stop. @param publisher: A L{LogPublisher} to watch for errors. If L{None}, the global log publisher will be watched. """ if publisher is None: from twisted.python import log as publisher running = [None] def stopIfError(event): if running and event.get('isError'): running.pop() reactor.stop() publisher.addObserver(stopIfError) case.addCleanup(publisher.removeObserver, stopIfError)
def test_stderrSkip(self): """ When the _errFlag is set to L{StandardErrorBehavior.DROP}, L{endpoints._WrapIProtocol} ignores stderr. """ self.ep._errFlag = StandardErrorBehavior.DROP d = self.ep.connect(self.factory) self.successResultOf(d) wpp = self.reactor.processProtocol log.addObserver(self._stdLog) self.addCleanup(log.removeObserver, self._stdLog) wpp.childDataReceived(2, b'stderr2') self.assertIsNone(self.eventLog)
def setUp(self): self.catcher = [] self.observer = self.catcher.append log.addObserver(self.observer) self.addCleanup(log.removeObserver, self.observer)
def test_erroneousErrors(self): """ Exceptions raised by log observers are logged but the observer which raised the exception remains registered with the publisher. These exceptions do not prevent the event from being sent to other observers registered with the publisher. """ L1 = [] L2 = [] def broken(event): 1 // 0 for observer in [L1.append, broken, L2.append]: log.addObserver(observer) self.addCleanup(log.removeObserver, observer) for i in range(3): # Reset the lists for simpler comparison. L1[:] = [] L2[:] = [] # Send out the event which will break one of the observers. log.msg("Howdy, y'all.", log_trace=[]) # The broken observer should have caused this to be logged. excs = self.flushLoggedErrors(ZeroDivisionError) del self.catcher[:] self.assertEqual(len(excs), 1) # Both other observers should have seen the message. self.assertEqual(len(L1), 2) self.assertEqual(len(L2), 2) # The first event is delivered to all observers; then, errors # are delivered. self.assertEqual(L1[0]['message'], ("Howdy, y'all.",)) self.assertEqual(L2[0]['message'], ("Howdy, y'all.",))
def test_showwarning(self): """ L{twisted.python.log.showwarning} emits the warning as a message to the Twisted logging system. """ publisher = log.LogPublisher() publisher.addObserver(self.observer) publisher.showwarning( FakeWarning("unique warning message"), FakeWarning, "warning-filename.py", 27) event = self.catcher.pop() self.assertEqual( event['format'] % event, 'warning-filename.py:27: twisted.test.test_log.FakeWarning: ' 'unique warning message') self.assertEqual(self.catcher, []) # Python 2.6 requires that any function used to override the # warnings.showwarning API accept a "line" parameter or a # deprecation warning is emitted. publisher.showwarning( FakeWarning("unique warning message"), FakeWarning, "warning-filename.py", 27, line=object()) event = self.catcher.pop() self.assertEqual( event['format'] % event, 'warning-filename.py:27: twisted.test.test_log.FakeWarning: ' 'unique warning message') self.assertEqual(self.catcher, [])
def test_emitPrefix(self): """ FileLogObserver.emit() will add a timestamp and system prefix to its file output. """ output = StringIO() flo = log.FileLogObserver(output) events = [] def observer(event): # Capture the event for reference and pass it along to flo events.append(event) flo.emit(event) publisher = log.LogPublisher() publisher.addObserver(observer) publisher.msg("Hello!") self.assertEqual(len(events), 1) event = events[0] result = output.getvalue() prefix = "{time} [{system}] ".format( time=flo.formatTime(event["time"]), system=event["system"], ) self.assertTrue( result.startswith(prefix), "{0!r} does not start with {1!r}".format(result, prefix) )
def setUp(self): self.resultLogs = [] log.addObserver(self.resultLogs.append)
def _add(self): if self._added == 0: log.addObserver(self.gotEvent) self._added += 1
def assertLogMessage(testCase, expectedMessages, callable, *args, **kwargs): """ Assert that the callable logs the expected messages when called. XXX: Put this somewhere where it can be re-used elsewhere. See #6677. @param testCase: The test case controlling the test which triggers the logged messages and on which assertions will be called. @type testCase: L{unittest.SynchronousTestCase} @param expectedMessages: A L{list} of the expected log messages @type expectedMessages: L{list} @param callable: The function which is expected to produce the C{expectedMessages} when called. @type callable: L{callable} @param args: Positional arguments to be passed to C{callable}. @type args: L{list} @param kwargs: Keyword arguments to be passed to C{callable}. @type kwargs: L{dict} """ loggedMessages = [] log.addObserver(loggedMessages.append) testCase.addCleanup(log.removeObserver, loggedMessages.append) callable(*args, **kwargs) testCase.assertEqual( [m['message'][0] for m in loggedMessages], expectedMessages)
def enable_sentry_reporting(client): # type: (Client) -> None """ Enable Sentry logging for any errors reported via twisted.python.log.err. :param client: Already configured raven.Client """ global raven_client raven_client = client log.addObserver(log_to_sentry)
def test_startLoggingOverridesWarning(self): """ startLogging() overrides global C{warnings.showwarning} such that warnings go to Twisted log observers. """ self._startLoggingCleanup() newPublisher = NewLogPublisher() class SysModule(object): stdout = object() stderr = object() tempLogPublisher = LogPublisher( newPublisher, newPublisher, logBeginner=LogBeginner(newPublisher, StringIO(), SysModule, warnings) ) # Trial reports warnings in two ways. First, it intercepts the global # 'showwarning' function *itself*, after starting logging (by way of # the '_collectWarnings' function which collects all warnings as a # around the test's 'run' method). Second, it has a log observer which # immediately reports warnings when they're propagated into the log # system (which, in normal operation, happens only at the end of the # test case). In order to avoid printing a spurious warning in this # test, we first replace the global log publisher's 'showwarning' in # the module with our own. self.patch(log, "theLogPublisher", tempLogPublisher) # And, one last thing, pretend we're starting from a fresh import, or # warnings.warn won't be patched at all. log._oldshowwarning = None # Global mutable state is bad, kids. Stay in school. fakeFile = StringIO() # We didn't previously save log messages, so let's make sure we don't # save them any more. evt = {"pre-start": "event"} received = [] def preStartObserver(x): if 'pre-start' in x.keys(): received.append(x) newPublisher(evt) newPublisher.addObserver(preStartObserver) log.startLogging(fakeFile, setStdout=False) self.addCleanup(tempLogPublisher._stopLogging) self.assertEqual(received, []) warnings.warn("hello!") output = fakeFile.getvalue() self.assertIn("UserWarning: hello!", output)
def resumeProducingRaises(self, consumer, expectedExceptions): """ Common implementation for tests where the underlying producer throws an exception when its resumeProducing is called. """ class ThrowingProducer(NonStreamingProducer): def resumeProducing(self): if self.counter == 2: return 1/0 else: NonStreamingProducer.resumeProducing(self) nsProducer = ThrowingProducer(consumer) streamingProducer = _PullToPush(nsProducer, consumer) consumer.registerProducer(streamingProducer, True) # Register log observer: loggedMsgs = [] log.addObserver(loggedMsgs.append) self.addCleanup(log.removeObserver, loggedMsgs.append) # Make consumer unregister do what TLSMemoryBIOProtocol would do: def unregister(orig=consumer.unregisterProducer): orig() streamingProducer.stopStreaming() consumer.unregisterProducer = unregister # Start streaming: streamingProducer.startStreaming() done = streamingProducer._coopTask.whenDone() done.addErrback(lambda reason: reason.trap(TaskStopped)) def stopped(ign): self.assertEqual(consumer.value(), b"01") # Any errors from resumeProducing were logged: errors = self.flushLoggedErrors() self.assertEqual(len(errors), len(expectedExceptions)) for f, (expected, msg), logMsg in zip( errors, expectedExceptions, loggedMsgs): self.assertTrue(f.check(expected)) self.assertIn(msg, logMsg['why']) # And the streaming wrapper stopped: self.assertTrue(streamingProducer._finished) done.addCallback(stopped) return done
def start(self, logfile=None, application_name="ooniprobe"): from ooni.settings import config if not logfile: logfile = os.path.expanduser(config.basic.logfile) log_folder = os.path.dirname(logfile) if (not os.access(log_folder, os.W_OK) or (os.path.exists(logfile) and not os.access(logfile, os.W_OK))): # If we don't have permissions to write to the log_folder or # logfile. log_folder = config.running_path logfile = os.path.join(log_folder, "ooniprobe.log") self.log_filepath = logfile mkdir_p(log_folder) log_filename = os.path.basename(logfile) file_log_level = levels.get(config.basic.loglevel, levels['INFO']) stdout_log_level = levels['INFO'] if config.advanced.debug: stdout_log_level = levels['DEBUG'] if config.basic.rotate == 'daily': logfile = MyDailyLogFile(log_filename, log_folder) elif config.basic.rotate == 'length': logfile = LogFile(log_filename, log_folder, rotateLength=int(human_size_to_bytes( config.basic.rotate_length )), maxRotatedFiles=config.basic.max_rotated_files) else: logfile = open(os.path.join(log_folder, log_filename), 'a') self.fileObserver = MsecLogObserver(logfile, log_level=file_log_level) self.stdoutObserver = StdoutStderrObserver(sys.stdout, log_level=stdout_log_level) tw_log.startLoggingWithObserver(self.fileObserver.emit) tw_log.addObserver(self.stdoutObserver.emit) tw_log.msg("Starting %s on %s (%s UTC)" % (application_name, otime.prettyDateNow(), otime.prettyDateNowUTC()))