我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用logging.raiseExceptions()。
def callHandlers(self, record): # this is the same as Python 3.5's logging.Logger.callHandlers c = self found = 0 while c: for hdlr in c.handlers: found = found + 1 if record.levelno >= hdlr.level: hdlr.handle(record) if not c.propagate: c = None # break out else: c = c.parent if (found == 0): if logging.lastResort: if record.levelno >= logging.lastResort.level: logging.lastResort.handle(record) elif logging.raiseExceptions and not self.manager.emittedNoHandlerWarning: sys.stderr.write("No handlers could be found for logger" " \"%s\"\n" % self.name) self.manager.emittedNoHandlerWarning = True
def test_execute_unicode_logs(self, client_class_mock): client_mock = mock.Mock(spec=Client) client_mock.create_container.return_value = {'Id': 'some_id'} client_mock.create_host_config.return_value = mock.Mock() client_mock.images.return_value = [] client_mock.logs.return_value = ['unicode container log ??'] client_mock.pull.return_value = [] client_mock.wait.return_value = 0 client_class_mock.return_value = client_mock originalRaiseExceptions = logging.raiseExceptions logging.raiseExceptions = True operator = DockerOperator(image='ubuntu', owner='unittest', task_id='unittest') with mock.patch('traceback.print_exception') as print_exception_mock: operator.execute(None) logging.raiseExceptions = originalRaiseExceptions print_exception_mock.assert_not_called()
def test_error_handling(self): h = TestStreamHandler(BadStream()) r = logging.makeLogRecord({}) old_raise = logging.raiseExceptions old_stderr = sys.stderr try: h.handle(r) self.assertIs(h.error_record, r) h = logging.StreamHandler(BadStream()) sys.stderr = sio = io.StringIO() h.handle(r) self.assertIn('\nRuntimeError: deliberate mistake\n', sio.getvalue()) logging.raiseExceptions = False sys.stderr = sio = io.StringIO() h.handle(r) self.assertEqual('', sio.getvalue()) finally: logging.raiseExceptions = old_raise sys.stderr = old_stderr # -- The following section could be moved into a server_helper.py module # -- if it proves to be of wider utility than just test_logging
def test_last_resort(self): # Test the last resort handler root = self.root_logger root.removeHandler(self.root_hdlr) old_stderr = sys.stderr old_lastresort = logging.lastResort old_raise_exceptions = logging.raiseExceptions try: sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), 'This is your final chance!\n') #No handlers and no last resort, so 'No handlers' message logging.lastResort = None sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n') # 'No handlers' message only printed once sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), '') root.manager.emittedNoHandlerWarning = False #If raiseExceptions is False, no message is printed logging.raiseExceptions = False sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), '') finally: sys.stderr = old_stderr root.addHandler(self.root_hdlr) logging.lastResort = old_lastresort logging.raiseExceptions = old_raise_exceptions
def log(self, level, msg, *args, **kwargs): """ Log with user-defined level, e.g. INFO_V0 to INFO_V5 """ if not isinstance(level, int): if logging.raiseExceptions: raise TypeError("level must be an integer") else: return if self.logger.isEnabledFor(level): msg, kwargs = self._process_msg(msg, *args, **kwargs) # self.logger.log(level, msg, **kwargs) self._log(level, msg, **kwargs)
def callHandlers(self, record): """ Pass a record to all relevant handlers. Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the "propagate" attribute set to zero is found - that will be the last logger whose handlers are called. """ c = self found = 0 while c: for hdlr in c.handlers: found = found + 1 if hdlr.name == 'console': if record.levelno >= hdlr.level: hdlr.handle(record) else: if record.levelno == hdlr.level: hdlr.handle(record) if not c.propagate: c = None # break out else: c = c.parent if ( found == 0 ) and raiseExceptions and not self.manager.emittedNoHandlerWarning: # noqa sys.stderr.write("No handlers could be found for logger" " \"%s\"\n" % self.name) self.manager.emittedNoHandlerWarning = 1
def test_last_resort(self): # Test the last resort handler root = self.root_logger root.removeHandler(self.root_hdlr) old_stderr = sys.stderr old_lastresort = logging.lastResort old_raise_exceptions = logging.raiseExceptions try: sys.stderr = sio = io.StringIO() root.debug('This should not appear') self.assertEqual(sio.getvalue(), '') root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), 'This is your final chance!\n') #No handlers and no last resort, so 'No handlers' message logging.lastResort = None sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n') # 'No handlers' message only printed once sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), '') root.manager.emittedNoHandlerWarning = False #If raiseExceptions is False, no message is printed logging.raiseExceptions = False sys.stderr = sio = io.StringIO() root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), '') finally: sys.stderr = old_stderr root.addHandler(self.root_hdlr) logging.lastResort = old_lastresort logging.raiseExceptions = old_raise_exceptions
def setUp(self): super(ShutdownTest, self).setUp() self.called = [] raise_exceptions = logging.raiseExceptions self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
def test_with_other_error_in_acquire_without_raise(self): logging.raiseExceptions = False self._test_with_failure_in_method('acquire', IndexError)
def test_with_other_error_in_flush_without_raise(self): logging.raiseExceptions = False self._test_with_failure_in_method('flush', IndexError)
def test_with_other_error_in_acquire_with_raise(self): logging.raiseExceptions = True self.assertRaises(IndexError, self._test_with_failure_in_method, 'acquire', IndexError)
def test_with_other_error_in_flush_with_raise(self): logging.raiseExceptions = True self.assertRaises(IndexError, self._test_with_failure_in_method, 'flush', IndexError)
def test_with_other_error_in_close_with_raise(self): logging.raiseExceptions = True self.assertRaises(IndexError, self._test_with_failure_in_method, 'close', IndexError)
def test_log_invalid_level_with_raise(self): old_raise = logging.raiseExceptions self.addCleanup(setattr, logging, 'raiseExecptions', old_raise) logging.raiseExceptions = True self.assertRaises(TypeError, self.logger.log, '10', 'test message')
def test_log_invalid_level_no_raise(self): old_raise = logging.raiseExceptions self.addCleanup(setattr, logging, 'raiseExecptions', old_raise) logging.raiseExceptions = False self.logger.log('10', 'test message') # no exception happens
def start_logging(out=_stdout, level='info'): """ Begin logging. :param out: if provided, a file-like object to log to. By default, this is stdout. :param level: the maximum log-level to emit (a string) """ global _log_level, _loggers, _started_logging if level not in log_levels: raise RuntimeError( "Invalid log level '{0}'; valid are: {1}".format( level, ', '.join(log_levels) ) ) if _started_logging: return _started_logging = True _log_level = level handler = _TxaioFileHandler(out) logging.getLogger().addHandler(handler) # note: Don't need to call basicConfig() or similar, because we've # now added at least one handler to the root logger logging.raiseExceptions = True # FIXME level_to_stdlib = { 'critical': logging.CRITICAL, 'error': logging.ERROR, 'warn': logging.WARNING, 'info': logging.INFO, 'debug': logging.DEBUG, 'trace': logging.DEBUG, } logging.getLogger().setLevel(level_to_stdlib[level]) # make sure any loggers we created before now have their log-level # set (any created after now will get it from _log_level for logger in _loggers: logger._set_log_level(level)
def test_log_invalid_level_with_raise(self): with swap_attr(logging, 'raiseExceptions', True): self.assertRaises(TypeError, self.logger.log, '10', 'test message')
def test_log_invalid_level_no_raise(self): with swap_attr(logging, 'raiseExceptions', False): self.logger.log('10', 'test message') # no exception happens
def setupLogger(console=True, File=False, Variable=False, Filebackupcount=0): ''' Setup a logger for the application :return: Nothing ''' global logger global log_capture_string # create logger logging.raiseExceptions = False logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(message)s') # Check if log exists and should therefore be rolled needRoll = os.path.isfile(LOG_FILENAME) if File: # create file handler which logs even debug messages and hold a backup of old logs fh = logging.handlers.RotatingFileHandler( LOG_FILENAME, backupCount=int(Filebackupcount)) # create a backup of the log fh.setLevel(logging.DEBUG) if args.debug else fh.setLevel(logging.INFO) #TODO:Change this to ERROR after dev fh.setFormatter(formatter) logger.addHandler(fh) if Variable: # create variable handler for on the fly read vh = logging.StreamHandler(log_capture_string) vh.setLevel(logging.DEBUG) if args.debug else vh.setLevel(logging.INFO) vh.setFormatter(formatter) logger.addHandler(vh) if console: # create console handler with a higher log level ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) if args.debug else ch.setLevel(logging.ERROR) ch.setFormatter(formatter) logger.addHandler(ch) # This is a stale log, so roll it if File and needRoll: # Add timestamp logger.debug(_('\n---------\nLog closed on {0}.\n---------\n').format(time.asctime())) # Roll over on application start logger.handlers[0].doRollover() # Add timestamp logger.debug(_('\n---------\nLog started on {0}.\n---------\n').format(time.asctime()))
def setup_logger(cls, args, logging_queue=None, main=False): assert isinstance(args, argparse.Namespace) if logging_queue: cls._queue = logging_queue log_level = logging.DEBUG if args.debug else logging.INFO log_handlers = [] if not args.dev: logging.raiseExceptions = False # Don't raise exception if in production mode if cls._queue: log_handlers.append(QueueHandler(cls._queue)) else: if args.dev: log_handlers.append(logging.StreamHandler()) if args.debug: try: with open(constants.log_debug, 'x') as f: pass except FileExistsError: pass lg = logging.FileHandler(constants.log_debug, 'w', 'utf-8') lg.setLevel(logging.DEBUG) log_handlers.append(lg) for log_path, lvl in ((constants.log_normal, logging.INFO), (constants.log_error, logging.ERROR)): try: with open(log_path, 'x') as f: # noqa: F841 pass except FileExistsError: pass lg = RotatingFileHandler( log_path, maxBytes=100000 * 10, encoding='utf-8', backupCount=1) lg.setLevel(lvl) log_handlers.append(lg) logging.basicConfig( level=log_level, format='%(asctime)-8s %(levelname)-10s %(name)-10s %(message)s', datefmt='%d-%m %H:%M', handlers=tuple(log_handlers)) if main: if args.dev: Logger("sqlalchemy.pool").setLevel(logging.DEBUG) Logger("sqlalchemy.engine").setLevel(logging.INFO) Logger("sqlalchemy.orm").setLevel(logging.INFO) Logger(__name__).i( os.path.split( constants.log_debug)[1], "created at", os.path.abspath( constants.log_debug), stdout=True)
def logger(): getpid_patch = patch('logging.os.getpid', return_value=111) getpid_patch.start() time_patch = patch('logging.time.time', return_value=946725071.111111) time_patch.start() localzone_patch = patch('rfc5424logging.handler.get_localzone', return_value=timezone) localzone_patch.start() hostname_patch = patch('rfc5424logging.handler.socket.gethostname', return_value="testhostname") hostname_patch.start() connect_patch = patch('logging.handlers.socket.socket.connect', side_effect=connect_mock) connect_patch.start() sendall_patch = patch('logging.handlers.socket.socket.sendall', side_effect=connect_mock) sendall_patch.start() if '_levelNames' in logging.__dict__: # Python 2.7 level_patch = patch.dict(logging._levelNames) level_patch.start() else: # Python 3.x level_patch1 = patch.dict(logging._levelToName) level_patch1.start() level_patch2 = patch.dict(logging._nameToLevel) level_patch2.start() logging.raiseExceptions = True logger = logging.getLogger() logger.setLevel(logging.DEBUG) yield logger getpid_patch.stop() time_patch.stop() localzone_patch.stop() hostname_patch.stop() connect_patch.stop() sendall_patch.stop() if '_levelNames' in logging.__dict__: # Python 2.7 level_patch.stop() else: # Python 3.x level_patch1.stop() level_patch2.stop() Rfc5424SysLogAdapter._extra_levels_enabled = False