我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.NOTSET。
def ConvertLog4ToCFLevel( log4level ): if log4level == logging.FATAL+1 : return CF.LogLevels.OFF if log4level == logging.FATAL : return CF.LogLevels.FATAL if log4level == logging.ERROR : return CF.LogLevels.ERROR if log4level == logging.WARN : return CF.LogLevels.WARN if log4level == logging.INFO : return CF.LogLevels.INFO if log4level == logging.DEBUG : return CF.LogLevels.DEBUG if log4level == logging.TRACE : return CF.LogLevels.TRACE if log4level == logging.NOTSET: return CF.LogLevels.ALL return CF.LogLevels.INFO
def log(self, level, msg, *args, **kwargs): """Delegate a log call to the underlying logger. The level here is determined by the echo flag as well as that of the underlying logger, and logger._log() is called directly. """ # inline the logic from isEnabledFor(), # getEffectiveLevel(), to avoid overhead. if self.logger.manager.disable >= level: return selected_level = self._echo_map[self.echo] if selected_level == logging.NOTSET: selected_level = self.logger.getEffectiveLevel() if level >= selected_level: self.logger._log(level, msg, args, **kwargs)
def run(self, argv = None, data = None, logger = None): """ Runs the function """ if not logger is None: assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger" self.logger = logger if not data is None: assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration" self.data = data self.options, self.arguments = self._argparser.parseArguments(argv) if self._argparser.loglevel == 1: self._configureConsoleLoggers(logging.NOTSET, True) elif self._argparser.loglevel == -1: self._configureConsoleLoggers(logging.CRITICAL, False) try: self._argparser.validateRequiredArguments() return self._app(ApplicationData(self)) except Exception as e: logger.exception(e) return 1
def test_custom_handler(self, mocker): handler = DummyHandler() mock = mocker.MagicMock() handler.emit = mock logger = make_logger() logger.handlers = [handler] disable(NOTSET) logger.debug('test') assert mock.call_count == 1 emit_call = mock.mock_calls[0] name, args, kwargs = emit_call assert name == '' log_record = args[0] assert isinstance(log_record, LogRecord) assert log_record.msg == 'test' assert log_record.levelname == 'DEBUG' del logger
def level_to_int(level: Union[str, int]) -> int: if isinstance(level, int): if logging.NOTSET <= level <= logging.FATAL: return level else: raise ValueError('Log level must be 0 <= level <= 50,' 'but gat: {}'.format(level)) elif isinstance(level, str): try: return getattr(logging, level.upper()) except AttributeError: raise ValueError('Invalid log level: {}'.format(level)) else: raise TypeError( 'Log level must be int (0 ~ 50) or string,' 'but gat type: {}'.format(type(level)))
def __init__(self, pathname, **settings): """initial config for singleton baka framework :param import_name: the name of the application package :param settings: *optional dict settings for pyramid configuration """ self.import_name = pathname self.settings = settings self.__include = {} self.__trafaret = trafaret_yaml # Only set up a default log handler if the # end-user application didn't set anything up. if not (logging.root.handlers and log.level == logging.NOTSET and settings.get('LOGGING')): formatter = logging.Formatter(logging_format) handler = logging.StreamHandler() handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(logging.INFO)
def get(self, key, *sources, default=None, log_level=None, log_value=True): """ Get config value for key using default sources or provided sources. A successful get (not returning None) will be cache value and source, subsequent get for that key will return that value regardless of other parameters. key -- the key for the value sources -- custom source order for this key, if no sources the sources set by constructor or source property will be used default -- return this value if all sources fail, default value will be cached and logged as specified log_level -- override log_level from constructor, makes get log key, value and source on first use, set to logging.NOTSET to turn off logging log_value -- set to False to prevent logging of value but still log the source for this key """ value, source = self.get_with_source(key, *sources, default=default, log_level=log_level, log_value=log_value) return value
def to(cls, channel, host='127.0.0.1', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, level=logging.NOTSET): """Convenience class method to create a ZmqLoghandler and connect to a ZMQ subscriber. Args: channel (string): Logging channel name. This is used to build a ZMQ topic. host (string): Hostname / ip address of the subscriber to publish to. port (int, string): Port on which to publish messages. level (int): Logging level """ context = zmq.Context() publisher = context.socket(zmq.PUB) address = 'tcp://{}:{}'.format(host, port) publisher.connect(address) time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem. return cls(channel, publisher, level=level)
def getLevel( self ): """ A convenience wrapper around ``getEffectiveLevel()`` because the integer values for the various logging levels are clunky and probably don't mean anything to you. Returns: str: the name of the effective log level for this logging object, in lowercase (``"warning"``, ``"info"``, etc.) """ level = self.getEffectiveLevel() if level == logging.CRITICAL: return 'critical' elif level == logging.ERROR: return 'error' elif level == logging.WARNING: return 'warning' elif level == logging.INFO: return 'info' elif level == logging.DEBUG: return 'debug' elif level == logging.NOTSET: return 'notset' else: return 'unknown ({})'.format( level )
def __init__(self, *args, **kwargs): client = kwargs.pop('client_cls', Client) if len(args) == 1: arg = args[0] args = args[1:] if isinstance(arg, Client): self.client = arg else: raise ValueError( 'The first argument to %s must be a Client instance, ' 'got %r instead.' % ( self.__class__.__name__, arg, )) elif 'client' in kwargs: self.client = kwargs.pop('client') else: self.client = client(*args, **kwargs) logging.Handler.__init__(self, level=kwargs.get('level', logging.NOTSET))
def test_get_level_name(self): """Test getLevelName returns level constant.""" # NOTE(flaper87): Bug #1517 self.assertEqual(logging.getLevelName('NOTSET'), 0) self.assertEqual(logging.getLevelName('DEBUG'), 10) self.assertEqual(logging.getLevelName('INFO'), 20) self.assertEqual(logging.getLevelName('WARN'), 30) self.assertEqual(logging.getLevelName('WARNING'), 30) self.assertEqual(logging.getLevelName('ERROR'), 40) self.assertEqual(logging.getLevelName('CRITICAL'), 50) self.assertEqual(logging.getLevelName(0), 'NOTSET') self.assertEqual(logging.getLevelName(10), 'DEBUG') self.assertEqual(logging.getLevelName(20), 'INFO') self.assertEqual(logging.getLevelName(30), 'WARNING') self.assertEqual(logging.getLevelName(40), 'ERROR') self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def _handle_existing_loggers(existing, child_loggers, disable_existing): """ When (re)configuring logging, handle loggers which were in the previous configuration but are not in the new configuration. There's no point deleting them as other threads may continue to hold references to them; and by disabling them, you stop them doing any logging. However, don't disable children of named loggers, as that's probably not what was intended by the user. Also, allow existing loggers to NOT be disabled if disable_existing is false. """ root = logging.root for log in existing: logger = root.manager.loggerDict[log] if log in child_loggers: logger.level = logging.NOTSET logger.handlers = [] logger.propagate = True elif disable_existing: logger.disabled = True
def get_console_logger(): try: console_logger = logging.getLogger() console_logger.setLevel(logging.NOTSET) # console_logger.propagate = False # if there are two console_logger use only one. if console_logger.handlers: console_logger.handlers.pop() # Set-up the logging configs ch = logging.StreamHandler() # Use the standard formatter constant ch.setFormatter(FORMATTER) # Only send stout INFO level messages ch.setLevel(logging.INFO) # TODO: Delete LessThanFilter if not needed in future # ch.addFilter(LessThanFilter(logging.WARNING)) # add the handler console_logger.addHandler(ch) except TypeError as e: sys.stdout.write(str("Console logger is having issues: {}\n".format(e))) return console_logger
def get_app_logger(name=None): try: logger_map = {"__webbreaker__": APP_LOG} app_logger = logging.getLogger("__webbreaker__") app_logger.setLevel(logging.NOTSET) # if there are two app_loggers use only one. if app_logger.handlers: app_logger.handlers.pop() formatter = logging.Formatter('%(asctime)s: %(name)s %(levelname)s(%(message)s') fh = logging.FileHandler(logger_map[name], mode='a') fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) fh.setLevel(logging.INFO) app_logger.addHandler(fh) except TypeError as e: sys.stdout.write(str("App logger error: {}!\n".format(e))) return app_logger
def get_debug_logger(name=None): try: debug_logger = logging.getLogger(name) debug_logger.setLevel(logging.NOTSET) # if there are two debug_logger use only one. if debug_logger.handlers: debug_logger.handlers.pop() debug_formatter = logging.Formatter('%(asctime)s: %(name)s %(levelname)s(%(message)s') fh = logging.FileHandler(DEBUG_LOG, mode='a') fh.setFormatter(debug_formatter) fh.setLevel(logging.DEBUG) debug_logger.addHandler(fh) except TypeError as e: sys.stdout.write(str("Debug logger error: {}!\n".format(e))) return debug_logger # Override existing hierarchical filter logic in logger
def tune(self, src_batch, trg_batch, epochs): self._ensure_model_loaded() if self._tuner is None: self._tuner = NMTEngineTrainer(self._model, self._optim, self._src_dict, self._trg_dict, model_params=self._model_params, gpu_ids=([0] if self._using_cuda else None)) self._tuner.min_perplexity_decrement = -1. self._tuner.set_log_level(logging.NOTSET) self._tuner.min_epochs = self._tuner.max_epochs = epochs # Convert words to indexes [suggestions] tuning_src_batch, tuning_trg_batch = [], [] for source, target in zip(src_batch, trg_batch): tuning_src_batch.append(self._src_dict.convertToIdx(source, Constants.UNK_WORD)) tuning_trg_batch.append(self._trg_dict.convertToIdx(target, Constants.UNK_WORD, Constants.BOS_WORD, Constants.EOS_WORD)) # Prepare data for training on the tuningBatch tuning_dataset = Dataset(tuning_src_batch, tuning_trg_batch, 32, self._using_cuda) self._tuner.train_model(tuning_dataset, save_epochs=0)
def setLogLevel(self,level): ''' This method allows to change the default logging level''' #if isinstance(level,basestring): level = level.upper() if type(level)==type(logging.NOTSET): self.log_obj.setLevel(level) #self.debug('log.Logger: Logging level set to %s'% #str(level).upper()) else: l = self.getLogLevel(level) if l is None: self.warning('log.Logger: Logging level cannot be set to "%s"' %level) elif l!=self.log_obj.level: self.log_obj.setLevel(l) self.debug('log.Logger: Logging level set to "%s" = %s' %(level,l)) return level
def add_loggers(self, loggers, stdout_level=logging.NOTSET, file_level=logging.NOTSET): """Adds loggers for stdout/filesystem handling. Stdout: loggers will log to stdout only when mentioned in `log` option. If they're mentioned without explicit level, `stdout_level` will be used. Filesystem: loggers will log to files at `file_level`. :arg loggers: List of logger names. :arg stdout_level: Default level at which stdout handlers will pass logs. By default: `logging.NOTSET`, which means: pass everything. :arg file_level: Level at which filesystem handlers will pass logs. By default: `logging.NOTSET`, which means: pass everything. """ self._enabled = True self._loggers.append((loggers, _sanitize_level(stdout_level), _sanitize_level(file_level)))
def _resetLogging(self): # ensure we dont attach the handlers multiple times. if self.logging_initialized: return self.logging_initialized = True with self.uid_manager: util.mkdirIfAbsent(self.resultdir) # attach logs to log files. # This happens in addition to anything that # is set up in the config file... ie. logs go everywhere for (log, filename, fmt_str) in ( (self.state.state_log, "state.log", self.config['state_log_fmt_str']), (self.build_log, "build.log", self.config['build_log_fmt_str']), (self.root_log, "root.log", self.config['root_log_fmt_str'])): fullPath = os.path.join(self.resultdir, filename) fh = logging.FileHandler(fullPath, "a+") formatter = logging.Formatter(fmt_str) fh.setFormatter(formatter) fh.setLevel(logging.NOTSET) log.addHandler(fh) log.info("Mock Version: %s", self.config['version'])
def emit(self, record): if record.levelno < logging.WARNING and self._modules and not record.name in self._modules: # Log INFO and DEBUG only with enabled modules return levels = { logging.CRITICAL: xbmc.LOGFATAL, logging.ERROR: xbmc.LOGERROR, logging.WARNING: xbmc.LOGWARNING, logging.INFO: xbmc.LOGNOTICE, logging.DEBUG: xbmc.LOGSEVERE, logging.NOTSET: xbmc.LOGNONE, } try: xbmc.log(self.format(record), levels[record.levelno]) except: try: xbmc.log(self.format(record).encode('utf-8', 'ignore'), levels[record.levelno]) except: xbmc.log(b"[%s] Unicode Error in message text" % self.pluginName, levels[record.levelno])
def DEFAULT_LOGGING_CONFIG(level=logging.WARN, format=LOG_FORMAT): """Returns a default logging config in dict format. Compatible with logging.config.dictConfig(), this default set the root logger to `level` with `sys.stdout` console handler using a formatter initialized with `format`. A simple 'brief' formatter is defined that shows only the message portion any log entries.""" return { "version": 1, "formatters": {"generic": {"format": format}, "brief": {"format": "%(message)s"}, }, "handlers": {"console": {"class": "logging.StreamHandler", "level": "NOTSET", "formatter": "generic", "stream": "ext://sys.stdout", }, }, "root": {"level": level, "handlers": ["console"], }, "loggers": {}, }
def __init__(self, login_ip, login_port, game_ip, game_port, magic=None, single_quotes=False, logger=None): self._login_ip = login_ip self._login_port = login_port self._game_ip = game_ip self._game_port = game_port if logger is None: logger = logging.getLogger() logger.setLevel(logging.NOTSET) handler = logging.StreamHandler(sys.stdout) logger.addHandler(handler) self._logger = logger self._magic = "Y(02.>'H}t\":E1" if magic is None else magic self._single_quotes = single_quotes self._connected = False self._buffer = "" self._handlers = {} self._nexts = [] self._internal_room_id = -1 self._id = -1 self._coins = -1 self._room = -1 self._penguins = {} self._follow = None
def setup_logging(handlers, facility, level): global log log = logging.getLogger('export-trac') formatter = logging.Formatter(' | '.join(['%(asctime)s', '%(name)s', '%(levelname)s', '%(message)s'])) if handlers in ['syslog', 'both']: sh = logging.handlers.SysLogHandler(address='/dev/log', facility=facility) sh.setFormatter(formatter) log.addHandler(sh) if handlers in ['stdout', 'both']: ch = logging.StreamHandler() ch.setFormatter(formatter) log.addHandler(ch) lmap = { 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'NOTSET': logging.NOTSET } log.setLevel(lmap[level])
def __init__( self, nameparts, propagate=False, level=logging.NOTSET, log_dir=None, *args, **kwargs ): super(BaseLogger, self).__init__() self._nameparts = nameparts self._propagate = None self._level = None self._log_dir = None self._name = ''.join(map(str, nameparts.values())) self.logger = logging.getLogger(self._name) self.propagate = propagate self.level = level self.log_dir = log_dir
def setup_logging(opts): logging.root.addHandler(logging.NullHandler()) if opts.verbose: logging_levels = { 0:logging.CRITICAL, 1:logging.ERROR, 2:logging.WARNING, 3:logging.INFO, 4:logging.DEBUG, 5:logging.NOTSET } fmt = '%(asctime)-15s.%(msecs)03d [%(threadName)10.10s] [%(levelname)6.6s] %(name)s#%(funcName)s:%(lineno)s %(message)s' datefmt = '%Y-%m-%dT%H:%M:%S' formatter = logging.Formatter(fmt, datefmt=datefmt) formatter.converter = time.gmtime hnd = logging.StreamHandler(stream=sys.stdout) hnd.setFormatter(formatter) logging.root.addHandler(hnd) logging.root.setLevel(logging_levels[opts.verbose - 1])
def __init__(self, level=logging.NOTSET, host=mongo_server, port=27017, database_name='logs', collection='logs', username=None, password=None, fail_silently=False, formatter=None): """Setting up mongo handler, initializing mongo database connection via pymongo.""" logging.Handler.__init__(self, level) self.host = host self.port = port self.database_name = database_name self.collection_name = collection self.username = username self.password = password self.fail_silently = fail_silently self.connection = None self.db = None self.collection = None self.authenticated = False self.formatter = formatter or MongoFormatter() self._connect()
def _handle_existing_loggers(existing, child_loggers, disable_existing): """ When (re)configuring logging, handle loggers which were in the previous configuration but are not in the new configuration. There's no point deleting them as other threads may continue to hold references to them; and by disabling them, you stop them doing any logging. However, don't disable children of named loggers, as that's probably not what was intended by the user. Also, allow existing loggers to NOT be disabled if disable_existing is false. """ root = logging.root for log in existing: logger = root.manager.loggerDict[log] if log in child_loggers: logger.level = logging.NOTSET logger.handlers = [] logger.propagate = True else: logger.disabled = disable_existing
def init_logger(self, log_type, path): """ :param path: Where log file will be created :param log_type: Type of log to recording. Example logging.NOTSET """ self.__log_file_path = os.path.join(path, self.__log_filename) log_formatter = logging.Formatter( "%(asctime)s [%(levelname)-8.8s] %(message)s") self.__logger = logging.getLogger() file_handler = SizedTimedRotatingFileHandler( self.__log_file_path, max_bytes=1000000, backup_count=5, interval=24, # encoding='bz2', # uncomment for bz2 compression ) file_handler.setFormatter(log_formatter) self.__logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) self.__logger.addHandler(console_handler) self.__logger.setLevel(log_type)
def saveLoggingContext(self, logcfg_url, oldstyle_loglevel, rscCtx ): # apply resource context to macro definitions if rscCtx: rscCtx.apply(self.loggingMacros ) self.loggingCtx = rscCtx # test we have a logging URLx self.loggingURL = logcfg_url if logcfg_url==None or logcfg_url=="" : self.logConfig = ossie.logger.GetDefaultConfig() else: # try to process URL and grab contents try: cfg_data=ossie.logger.GetConfigFileContents( logcfg_url ) if cfg_data and len(cfg_data) > 0 : self.logConfig = ossie.logger.ExpandMacros( cfg_data, self.loggingMacros ) except: pass # apply logging level if explicitly stated if oldstyle_loglevel != None and oldstyle_loglevel > -1 : loglevel = ossie.logger.ConvertLogLevel(oldstyle_loglevel) self.setLogLevel( "", loglevel ) if self._log and self._log.getEffectiveLevel() == logging.NOTSET: self.setLevel( self._logid, loglevel ) else: if self._log and self._log.getEffectiveLevel() == logging.NOTSET: loglevel = ossie.logger.ConvertLog4ToCFLevel( logging.getLogger(None).getEffectiveLevel() ) self.setLogLevel( self._logid, loglevel ) # assign an event channel manager to the logging library ossie.logger.SetEventChannelManager( self._ecm )
def __init__(self): self.stream = sys.stdout # in log4j stdout is default self.threshold = logging.NOTSET self.log4pyProps = {'strm':sys.stdout}
def __init__(self): self.threshold = logging.NOTSET self.log4pyProps = {}
def setUp(self): self.setContext() if self.srcData: self.seq = self.srcData else: self.seq = range(100) self.orb = CORBA.ORB_init(); self.rootPOA = self.orb.resolve_initial_references("RootPOA") self.logger = logging.getLogger(self.ptype[0]) self.logger.setLevel(logging.NOTSET) self.logger.info( "Setup - Multiout Create Ports Table " ); self.ip1 = bulkio.InFloatPort("sink_1", self.logger ); self.ip1_oid = self.rootPOA.activate_object(self.ip1); self.ip2 = bulkio.InFloatPort("sink_2", self.logger ); self.ip2_oid = self.rootPOA.activate_object(self.ip2); self.ip3 = bulkio.InFloatPort("sink_3", self.logger ); self.ip3_oid = self.rootPOA.activate_object(self.ip3); self.ip4 = bulkio.InFloatPort("sink_4", self.logger ); self.ip4_oid = self.rootPOA.activate_object(self.ip4); self.port = bulkio.OutFloatPort("multiout_source", self.logger ); self.port_oid = self.rootPOA.activate_object(self.port); self.desc_list=[]; self.logger.info( "Setup - Multiout Connection Table " ); self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-1" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-2" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_1", stream_id="stream-1-3" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-1" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-2" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_2", stream_id="stream-2-3" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-1" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-2" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_3", stream_id="stream-3-3" ) ) self.desc_list.append( bulkio.connection_descriptor_struct( port_name="multiout_source", connection_id="connection_4", stream_id="stream-4-1" ) )
def _resetExistingLoggers(parent="root"): """ Reset the logger named 'parent' and all its children to their initial state, if they already exist in the current configuration. """ root = logging.root # get sorted list of all existing loggers existing = sorted(root.manager.loggerDict.keys()) if parent == "root": # all the existing loggers are children of 'root' loggers_to_reset = [parent] + existing elif parent not in existing: # nothing to do return elif parent in existing: loggers_to_reset = [parent] # collect children, starting with the entry after parent name i = existing.index(parent) + 1 prefixed = parent + "." pflen = len(prefixed) num_existing = len(existing) while i < num_existing: if existing[i][:pflen] == prefixed: loggers_to_reset.append(existing[i]) i += 1 for name in loggers_to_reset: if name == "root": root.setLevel(logging.WARNING) for h in root.handlers[:]: root.removeHandler(h) for f in root.filters[:]: root.removeFilters(f) root.disabled = False else: logger = root.manager.loggerDict[name] logger.level = logging.NOTSET logger.handlers = [] logger.filters = [] logger.propagate = True logger.disabled = False
def __init__(self, level=logging.NOTSET): """ Initialize the handler. """ logging.Handler.__init__(self, level)
def __init__(self, name=None, router=None, load_env=True, log_config=LOGGING): if log_config: logging.config.dictConfig(log_config) # Only set up a default log handler if the # end-user application didn't set anything up. if not (logging.root.handlers and log.level == logging.NOTSET and log_config): formatter = logging.Formatter( "%(asctime)s: %(levelname)s: %(message)s") handler = logging.StreamHandler() handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(logging.INFO) # Get name from previous stack frame if name is None: frame_records = stack()[1] name = getmodulename(frame_records[1]) self.name = name self.config = Config(load_env=load_env) self.router = router or PathRouter() self.debug = None self.static_handler = None
def getEffectiveLevel(self): """What's the effective level for this logger?""" level = self._echo_map[self.echo] if level == logging.NOTSET: level = self.logger.getEffectiveLevel() return level