我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.Filter()。
def setup_logging(verbose=0, colors=False, name=None): """Configure console logging. Info and below go to stdout, others go to stderr. :param int verbose: Verbosity level. > 0 print debug statements. > 1 passed to sphinx-build. :param bool colors: Print color text in non-verbose mode. :param str name: Which logger name to set handlers to. Used for testing. """ root_logger = logging.getLogger(name) root_logger.setLevel(logging.DEBUG if verbose > 0 else logging.INFO) formatter = ColorFormatter(verbose > 0, colors) if colors: colorclass.Windows.enable() handler_stdout = logging.StreamHandler(sys.stdout) handler_stdout.setFormatter(formatter) handler_stdout.setLevel(logging.DEBUG) handler_stdout.addFilter(type('', (logging.Filter,), {'filter': staticmethod(lambda r: r.levelno <= logging.INFO)})) root_logger.addHandler(handler_stdout) handler_stderr = logging.StreamHandler(sys.stderr) handler_stderr.setFormatter(formatter) handler_stderr.setLevel(logging.WARNING) root_logger.addHandler(handler_stderr)
def isIPv(version, ip): """Check if **ip** is a certain **version** (IPv4 or IPv6). .. warning: Do *not* put any calls to the logging module in this function, or else an infinite recursion will occur when the call is made, due the the log :class:`~logging.Filter`s in :mod:`~bridgedb.safelog` using this function to validate matches from the regular expression for IP addresses. :param integer version: The IPv[4|6] version to check; must be either ``4`` or ``6``. Any other value will be silently changed to ``4``. :param ip: The IP address to check. May be an any type which :class:`ipaddr.IPAddress` will accept. :rtype: boolean :returns: ``True``, if the address is an IPv4 address. """ try: ipaddr.IPAddress(ip, version=version) except (ipaddr.AddressValueError, Exception): return False else: return True return False
def test_filter(self): # Only messages satisfying the specified criteria pass through the # filter. filter_ = logging.Filter("spam.eggs") handler = self.root_logger.handlers[0] try: handler.addFilter(filter_) spam = logging.getLogger("spam") spam_eggs = logging.getLogger("spam.eggs") spam_eggs_fish = logging.getLogger("spam.eggs.fish") spam_bakedbeans = logging.getLogger("spam.bakedbeans") spam.info(self.next_message()) spam_eggs.info(self.next_message()) # Good. spam_eggs_fish.info(self.next_message()) # Good. spam_bakedbeans.info(self.next_message()) self.assert_log_lines([ ('spam.eggs', 'INFO', '2'), ('spam.eggs.fish', 'INFO', '3'), ]) finally: handler.removeFilter(filter_)
def setUp(self): class CheckingFilter(logging.Filter): def __init__(self, cls): self.cls = cls def filter(self, record): t = type(record) if t is not self.cls: msg = 'Unexpected LogRecord type %s, expected %s' % (t, self.cls) raise TypeError(msg) return True BaseTest.setUp(self) self.filter = CheckingFilter(DerivedLogRecord) self.root_logger.addFilter(self.filter) self.orig_factory = logging.getLogRecordFactory()
def setup_logger(self, logger, level=None): if isinstance(logger, str): logger = logging.getLogger(logger) if level is None: logger.setLevel(logging.DEBUG if settings.DEVELOPMENT_MODE else logging.DEBUG) else: logger.setLevel(level) for handler in logging.root.handlers: handler.addFilter(logging.Filter("gold-digger")) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "[%(levelname)s] %(asctime)s at %(filename)s:%(lineno)d (%(processName)s) -- %(message)s", "%Y-%m-%d %H:%M:%S") ) logger.addHandler(handler) if not settings.DEVELOPMENT_MODE: handler = graypy.GELFHandler(settings.GRAYLOG_ADDRESS, settings.GRAYLOG_PORT) logger.addHandler(handler) return logger
def test_load_filters_onlyused(self): self.m_obj.load_config({ "handlers" : { "root" : { "filters" : [ "f2" ] } }, "filters" : { "f1" : { "class" : "logging.Filter" }, "f2" : { "class" : "logging.Filter" } } }) self.m_obj._load_filters() self.assertEqual(len(self.m_obj.m_filters), 1) self.assertEqual(list(self.m_obj.m_filters.keys())[0], "f2")
def test_load_filters_noclass(self): self.m_obj.load_config({ "handlers" : { "root" : { "filters" : [ "f2" ] } }, "filters" : { "f1" : { "param" : "logging.Filter" }, "f2" : { "param" : "logging.Filter" } } }) with self.assertRaises(error.XtdError): self.m_obj._load_filters()
def SetupLoggers(): global logger logger = logging.getLogger() logging.info("testing before") logger.setLevel(logging.DEBUG) #logfile = os.path.join(os.path.dirname(__file__), "../../logs/CADIS.log") #flog = logging.handlers.RotatingFileHandler(logfile, maxBytes=10*1024*1024, backupCount=50, mode='w') #flog.setFormatter(logging.Formatter('%(levelname)s [%(name)s] %(message)s')) #logger.addHandler(flog) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) clog = logging.StreamHandler() clog.addFilter(logging.Filter(name='CADIS')) clog.setFormatter(logging.Formatter('[%(name)s] %(message)s')) clog.setLevel(logging.DEBUG) logger.addHandler(clog)
def SetupLoggers(): global logger logger = logging.getLogger() logging.info("testing before") logger.setLevel(logging.DEBUG) #logfile = os.path.join(os.path.dirname(__file__), "../../logs/CADIS.log") #flog = logging.handlers.RotatingFileHandler(logfile, maxBytes=10*1024*1024, backupCount=50, mode='w') #flog.setFormatter(logging.Formatter('%(levelname)s [%(name)s] %(message)s')) #logger.addHandler(flog) logging.getLogger("requests").setLevel(logging.WARNING) clog = logging.StreamHandler() clog.addFilter(logging.Filter(name='CADIS')) clog.setFormatter(logging.Formatter('[%(name)s] %(message)s')) clog.setLevel(logging.DEBUG) logger.addHandler(clog)
def setupLoggers(): global logger logger = logging.getLogger() logging.info("testing before") logger.setLevel(logging.DEBUG) # logfile = os.path.join(os.path.dirname(__file__), "../../logs/CADIS.log") # flog = logging.handlers.RotatingFileHandler(logfile, maxBytes=10*1024*1024, backupCount=50, mode='w') # flog.setFormatter(logging.Formatter('%(levelname)s [%(name)s] %(message)s')) # logger.addHandler(flog) logging.getLogger("requests").setLevel(logging.WARNING) clog = logging.StreamHandler() clog.addFilter(logging.Filter(name='CADIS')) clog.setFormatter(logging.Formatter('[%(name)s] %(message)s')) clog.setLevel(logging.DEBUG) logger.addHandler(clog)
def setup_stream_handlers(conf): """Setup logging stream handlers according to the options.""" class StdoutFilter(logging.Filter): def filter(self, record): return record.levelno in (logging.DEBUG, logging.INFO) if log.handlers: for handler in log.handlers: log.removeHandler(handler) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.WARNING) stdout_handler.addFilter(StdoutFilter()) if conf.debug: stdout_handler.setLevel(logging.DEBUG) elif conf.verbose: stdout_handler.setLevel(logging.INFO) else: stdout_handler.setLevel(logging.WARNING) log.addHandler(stdout_handler) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.WARNING) log.addHandler(stderr_handler)
def test_nag_timer(self): w = [] l = logging.getLogger() class _Filter(logging.Filter): def filter(self, record): if record.levelno == logging.WARNING: w.append(record.getMessage().lstrip()) return 0 f = _Filter() l.addFilter(f) proc = subprocess2.Popen( self.exe + ['--stdout', '--sleep_first'], stdout=PIPE) res = proc.communicate(nag_timer=3), proc.returncode l.removeFilter(f) self._check_res(res, 'A\nBB\nCCC\n', None, 0) expected = ['No output for 3 seconds from command:', proc.cmd_str, 'No output for 6 seconds from command:', proc.cmd_str, 'No output for 9 seconds from command:', proc.cmd_str] self.assertEquals(w, expected)
def setup_stream_handlers(options): """Setup logging stream handlers according to the options.""" class StdoutFilter(logging.Filter): def filter(self, record): return record.levelno in (logging.DEBUG, logging.INFO) if log.handlers: for handler in log.handlers: log.removeHandler(handler) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.WARNING) stdout_handler.addFilter(StdoutFilter()) if options.debug: stdout_handler.setLevel(logging.DEBUG) elif options.verbose: stdout_handler.setLevel(logging.INFO) else: stdout_handler.setLevel(logging.WARNING) log.addHandler(stdout_handler) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(logging.WARNING) log.addHandler(stderr_handler)
def __init__(self, name='', lvls='normal', lvldiff=None): logging.Filter.__init__(self, name) if not isinstance(lvls, list) and lvls in levels: self.levels = levels[lvls] elif not isinstance(lvls, list) and not lvls is None: self.levels = [lvls] else: self.levels = lvls self.lvlhide = [] self.lvlshow = [] if not lvldiff: lvldiff = [] for ld in lvldiff: if ld.startswith('-'): self.lvlhide.append(ld[1:].upper()) elif ld.startswith('+'): self.lvlshow.append(ld[1:].upper()) else: self.lvlshow.append(ld.upper())
def pytest_configure(): import logging logging.basicConfig() logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) class Shorten(logging.Filter): max_len = 500 def filter(self, record): if record.msg == '%r': record.msg = record.msg % record.args record.args = () if len(record.msg) > self.max_len: record.msg = record.msg[:self.max_len] + '...' return True logging.getLogger('sqlalchemy.engine.base.Engine').addFilter(Shorten())
def configure_filter(self, config): """Configure a filter from a dictionary.""" if '()' in config: result = self.configure_custom(config) else: name = config.get('name', '') result = logging.Filter(name) return result
def __init__(self, *args, **kwargs): # Can't use super() because logging.Filter is an old-style class in py26 logging.Filter.__init__(self, *args, **kwargs) self.warning_count = self.error_count = 0
def __init__(self): logging.Filter.__init__(self) self.warning_table = {}
def __init__(self, level): self._level = level logging.Filter.__init__(self)
def filter(self, record: logging.LogRecord): """Filter log messages.""" if self.origin is None: return True for _filter in self.origin: if record.origin.startswith(_filter): return False if self.exclude else True return True if self.exclude else False
def __init__(self, invert=False): logging.Filter.__init__(self) self._invert = invert self.exp = re.compile(r"Post|Meta|Process")
def __init__(self, bot): self.bot = bot logging.Filter.__init__(self)
def basic_config(logger: logging.Logger = logging.root, level=logging.INFO): """ Configures a logger to log <=INFO to stdout and >INFO to stderr :param logger: Logger to configure, defaults to logging.root :param level: Defaults to INFO :return: configured logger (logger from parameters) """ logger.setLevel(level) class InfoFilter(logging.Filter): def filter(self, rec): return rec.levelno in (logging.DEBUG, logging.INFO) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s", "%d/%m/%Y %H:%M:%S") std_out_handler = logging.StreamHandler(sys.stdout) std_out_handler.setLevel(logging.DEBUG) std_out_handler.setFormatter(formatter) std_out_handler.addFilter(InfoFilter()) std_err_handler = logging.StreamHandler() std_err_handler.setLevel(logging.WARNING) std_err_handler.setFormatter(formatter) logger.addHandler(std_out_handler) logger.addHandler(std_err_handler) return logger
def __init__(self, name, level): logging.Filter.__init__(self) self.name = name self.level = level
def test_logger_filter(self): # Filter at logger level. self.root_logger.setLevel(VERBOSE) # Levels >= 'Verbose' are good. self.log_at_all_levels(self.root_logger) self.assert_log_lines([ ('Verbose', '5'), ('Sociable', '6'), ('Effusive', '7'), ('Terse', '8'), ('Taciturn', '9'), ('Silent', '10'), ])
def test_handler_filter(self): # Filter at handler level. self.root_logger.handlers[0].setLevel(SOCIABLE) try: # Levels >= 'Sociable' are good. self.log_at_all_levels(self.root_logger) self.assert_log_lines([ ('Sociable', '6'), ('Effusive', '7'), ('Terse', '8'), ('Taciturn', '9'), ('Silent', '10'), ]) finally: self.root_logger.handlers[0].setLevel(logging.NOTSET)
def __init__(self, level): self.filter_level = level logging.Filter.__init__(self)
def test_filter(self): # Only messages satisfying the specified criteria pass through the # filter. filter_ = logging.Filter("spam.eggs") handler = self.root_logger.handlers[0] try: handler.addFilter(filter_) spam = logging.getLogger("spam") spam_eggs = logging.getLogger("spam.eggs") spam_eggs_fish = logging.getLogger("spam.eggs.fish") spam_bakedbeans = logging.getLogger("spam.bakedbeans") spam.info(self.next_message()) spam_eggs.info(self.next_message()) # Good. spam_eggs_fish.info(self.next_message()) # Good. spam_bakedbeans.info(self.next_message()) self.assert_log_lines([ ('spam.eggs', 'INFO', '2'), ('spam.eggs.fish', 'INFO', '3'), ]) finally: handler.removeFilter(filter_) # # First, we define our levels. There can be as many as you want - the only # limitations are that they should be integers, the lowest should be > 0 and # larger values mean less information being logged. If you need specific # level values which do not fit into these limitations, you can use a # mapping dictionary to convert between your application levels and the # logging system. #