我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.WARN。
def ConvertLog4ToCFLevel( log4level ): if log4level == logging.FATAL+1 : return CF.LogLevels.OFF if log4level == logging.FATAL : return CF.LogLevels.FATAL if log4level == logging.ERROR : return CF.LogLevels.ERROR if log4level == logging.WARN : return CF.LogLevels.WARN if log4level == logging.INFO : return CF.LogLevels.INFO if log4level == logging.DEBUG : return CF.LogLevels.DEBUG if log4level == logging.TRACE : return CF.LogLevels.TRACE if log4level == logging.NOTSET: return CF.LogLevels.ALL return CF.LogLevels.INFO
def ConvertToLog4Level( newLevel ): level = logging.INFO if newLevel == CF.LogLevels.OFF : level=logging.FATAL+1 if newLevel == CF.LogLevels.FATAL : level=logging.FATAL if newLevel == CF.LogLevels.ERROR : level=logging.ERROR if newLevel == CF.LogLevels.WARN : level=logging.WARN if newLevel == CF.LogLevels.INFO: level=logging.INFO if newLevel == CF.LogLevels.DEBUG: level=logging.DEBUG if newLevel == CF.LogLevels.TRACE: level=logging.TRACE if newLevel == CF.LogLevels.ALL: level=logging.TRACE return level
def deprecated(self, removal_version, msg, *args, **kwargs): """ Logs deprecation message which is log level WARN if the ``removal_version`` is > 1 minor release away and log level ERROR otherwise. removal_version should be the version that the deprecated feature is expected to be removed in, so something that will not exist in version 1.7, but will in 1.6 would have a removal_version of 1.7. """ from pip import __version__ if should_warn(__version__, removal_version): self.warn(msg, *args, **kwargs) else: self.error(msg, *args, **kwargs)
def __init__(self, **kwargs): super(Feature, self).__init__(**kwargs) self.result_file = "{}/{}.log".format( CONST.__getattribute__('dir_results'), self.case_name) try: module = kwargs['run']['module'] self.logger = logging.getLogger(module) except KeyError: self.__logger.warning( "Cannot get module name %s. Using %s as fallback", kwargs, self.case_name) self.logger = logging.getLogger(self.case_name) handler = logging.StreamHandler() handler.setLevel(logging.WARN) self.logger.addHandler(handler) handler = logging.FileHandler(self.result_file) handler.setLevel(logging.DEBUG) self.logger.addHandler(handler) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler)
def setup_logging(app): """Setup logging.""" from logging.handlers import RotatingFileHandler from logging import Formatter log_file_path = app.config.get('LOG_FILE') log_level = app.config.get('LOG_LEVEL', logging.WARN) if log_file_path: # pragma: no cover file_handler = RotatingFileHandler(log_file_path) file_handler.setFormatter(Formatter( '%(name)s:%(levelname)s:[%(asctime)s] %(message)s ' '[in %(pathname)s:%(lineno)d]' )) file_handler.setLevel(log_level) app.logger.addHandler(file_handler) logger = logging.getLogger('pybossa') logger.setLevel(log_level) logger.addHandler(file_handler)
def main(): args = parse_args() logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO)) # Don't allow more than 10 concurrent requests to the wayback machine concurrency = min(args.concurrency, 10) # Scrape results are stored in a temporary folder if no folder specified target_folder = args.target_folder if args.target_folder else tempfile.gettempdir() logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder)) # Parse the period entered by the user (throws an exception if the dates are not correctly formatted) from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT) to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT) # The scraper downloads the elements matching the given xpath expression in the target folder scraper = Scraper(target_folder, args.xpath) # Launch the scraping using the scraper previously instantiated scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta), concurrency)
def set_log(level, filename='jumpserver.log'): """ return a log file object ??????log?? """ log_file = os.path.join(LOG_DIR, filename) if not os.path.isfile(log_file): os.mknod(log_file) os.chmod(log_file, 0777) log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR, 'critical': logging.CRITICAL} logger_f = logging.getLogger('jumpserver') logger_f.setLevel(logging.DEBUG) fh = logging.FileHandler(log_file) fh.setLevel(log_level_total.get(level, logging.DEBUG)) formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) logger_f.addHandler(fh) return logger_f
def tolog(self, msg, level=None): try: level = level if level else self._level level = str(level).lower() level = self.get_map_level(level) if level == logging.DEBUG: self._logger.debug(msg) if level == logging.INFO: self._logger.info(msg) if level == logging.WARN: self._logger.warn(msg) if level == logging.ERROR: self._logger.error(msg) if level == logging.CRITICAL: self._logger.critical(msg) except Exception as expt: print expt
def run(infile, outfile, time_interval, quiet): logging.basicConfig(level=logging.WARN if quiet else logging.INFO) logger = logging.getLogger(__name__) logger.info('loading input file %s ...' % infile) with open(infile) as fin: # Do not use click.File because we want close the file asap data = json.load(fin) n = len(data) logger.info( 'loading input file %s done. %d data found.'% (infile, n)) for i in xrange(len(data)): logger.info('Sleeping for %d sec [%d/%d] ...' % (time_interval, i+1, n)) time.sleep(time_interval) with open(outfile, 'w') as fout: json.dump(data[:(i+1)], fout) logger.info('Dumped %dth/%d data to %s' % (i+1, n, outfile))
def cli(ctx, config, quiet, debug): """ Tumblr Downloader CLI utility """ # Logging setup if debug: log_level = logging.DEBUG else: log_level = logging.CRITICAL if quiet else logging.WARN ctx.log = logging.getLogger('tumdlr') ctx.log.setLevel(log_level) ch = logging.StreamHandler() ch.setLevel(log_level) ch.setFormatter(logging.Formatter('[%(levelname)s] %(name)s: %(message)s')) ctx.log.addHandler(ch) # First run? if not ctx.config['Development'].getboolean('AgreedToTerms'): first_run(ctx)
def _load_config(cls, path): if cls._loaded_config == path: return LOG.debug('Load ramdisk-func-test configuration') args = {} if path: args['default_config_files'] = [path] conf.CONF([], project=conf.PROJECT_NAME, **args) # configure log level for libs we are using for channel, level in [ ('paramiko', logging.WARN), ('ironic.openstack.common', logging.WARN)]: logger = logging.getLogger(channel) logger.setLevel(level) cls._loaded_config = path
def test_setup_logger_logfile_custom_loglevel(capsys): """ setup_logger(..) with filelogger and custom loglevel """ logzero.reset_default_logger() temp = tempfile.NamedTemporaryFile() try: logger = logzero.setup_logger(logfile=temp.name, fileLoglevel=logging.WARN) logger.info("info1") logger.warn("warn1") with open(temp.name) as f: content = f.read() assert "] info1" not in content assert "] warn1" in content finally: temp.close()
def test_api_loglevel(capsys): """ Should reconfigure the internal logger loglevel """ logzero.reset_default_logger() temp = tempfile.NamedTemporaryFile() try: logzero.logfile(temp.name) logzero.logger.info("info1") logzero.loglevel(logging.WARN) logzero.logger.info("info2") logzero.logger.warn("warn1") with open(temp.name) as f: content = f.read() assert "] info1" in content assert "] info2" not in content assert "] warn1" in content finally: temp.close()
def test_api_loglevel_custom_handlers(capsys): """ Should reconfigure the internal logger loglevel and custom handlers """ logzero.reset_default_logger() # TODO pass # temp = tempfile.NamedTemporaryFile() # try: # logzero.logfile(temp.name) # logzero.logger.info("info1") # logzero.loglevel(logging.WARN) # logzero.logger.info("info2") # logzero.logger.warn("warn1") # with open(temp.name) as f: # content = f.read() # assert "] info1" in content # assert "] info2" not in content # assert "] warn1" in content # finally: # temp.close()
def setup_default_logger(logfile=None, level=logging.DEBUG, formatter=None, maxBytes=0, backupCount=0): """ Deprecated. Use `logzero.loglevel(..)`, `logzero.logfile(..)`, etc. Globally reconfigures the default `logzero.logger` instance. Usage: .. code-block:: python from logzero import logger, setup_default_logger setup_default_logger(level=logging.WARN) logger.info("hello") # this will not be displayed anymore because minimum loglevel was set to WARN :arg string logfile: If set, also write logs to the specified filename. :arg int level: Minimum `logging-level <https://docs.python.org/2/library/logging.html#logging-levels>`_ to display (default: `logging.DEBUG`). :arg Formatter formatter: `Python logging Formatter object <https://docs.python.org/2/library/logging.html#formatter-objects>`_ (by default uses the internal LogFormatter). :arg int maxBytes: Size of the logfile when rollover should occur. Defaults to 0, rollover never occurs. :arg int backupCount: Number of backups to keep. Defaults to 0, rollover never occurs. """ global logger logger = setup_logger(name=LOGZERO_DEFAULT_LOGGER, logfile=logfile, level=level, formatter=formatter) return logger
def _logWriter(self,level,message,exception=None): self._logger.setLevel(level) self._fh.setLevel(level) self._ch.setLevel(level) if(exception!=None): exFormatted = self._formatException(exception) msg = "%s%s" % (message,exFormatted) if(level==logging.DEBUG): logging.debug(msg) elif(level==logging.INFO): logging.info(msg) elif(level==logging.WARN): logging.warn(msg) elif(level==logging.FATAL): logging.fatal(msg) if(level==logging.ERROR): logging.error(msg)
def queue_packet(self, packet): """Queues sending a packet to its intended owner""" connection_id = packet.get_receiver_connection_id() if not self.socket_is_open(connection_id): # Warn if there is no socket to send through for the expected recip shared_utils.print_and_log( logging.WARN, 'Can not send packet to worker_id {}: packet queue not found. ' 'Message: {}'.format(connection_id, packet.data) ) return shared_utils.print_and_log( logging.DEBUG, 'Put packet ({}) in queue ({})'.format(packet.id, connection_id) ) # Get the current time to put packet into the priority queue self.packet_map[packet.id] = packet item = (time.time(), packet) self._safe_put(connection_id, item)
def approve_work(self): """Approving work after it has been submitted""" if self.hit_is_abandoned: self._print_not_available_for('review') else: if self.manager.get_agent_work_status(self.assignment_id) == \ self.ASSIGNMENT_DONE: self.manager.approve_work(assignment_id=self.assignment_id) shared_utils.print_and_log( logging.INFO, 'Conversation ID: {}, Agent ID: {} - HIT is ' 'approved.'.format(self.conversation_id, self.id) ) else: shared_utils.print_and_log( logging.WARN, 'Cannot approve HIT. Turker hasn\'t completed the HIT yet.' )
def reject_work(self, reason='unspecified'): """Reject work after it has been submitted""" if self.hit_is_abandoned: self._print_not_available_for('review') else: if self.manager.get_agent_work_status(self.assignment_id) == \ self.ASSIGNMENT_DONE: self.manager.reject_work(self.assignment_id, reason) shared_utils.print_and_log( logging.INFO, 'Conversation ID: {}, Agent ID: {} - HIT is ' 'rejected.'.format(self.conversation_id, self.id) ) else: shared_utils.print_and_log( logging.WARN, 'Cannot reject HIT. Turker hasn\'t completed the HIT yet.' )
def train(env_id, num_timesteps, seed, policy, lrschedule, num_cpu): def make_env(rank): def _thunk(): env = make_atari(env_id) env.seed(seed + rank) env = bench.Monitor(env, logger.get_dir() and os.path.join(logger.get_dir(), str(rank))) gym.logger.setLevel(logging.WARN) return wrap_deepmind(env) return _thunk set_global_seeds(seed) env = SubprocVecEnv([make_env(i) for i in range(num_cpu)]) if policy == 'cnn': policy_fn = AcerCnnPolicy elif policy == 'lstm': policy_fn = AcerLstmPolicy else: print("Policy {} not implemented".format(policy)) return learn(policy_fn, env, seed, total_timesteps=int(num_timesteps * 1.1), lrschedule=lrschedule) env.close()
def train(env_id, num_timesteps, seed, policy, lrschedule, num_cpu): def make_env(rank): def _thunk(): env = make_atari(env_id) env.seed(seed + rank) env = bench.Monitor(env, logger.get_dir() and os.path.join(logger.get_dir(), str(rank))) gym.logger.setLevel(logging.WARN) return wrap_deepmind(env) return _thunk set_global_seeds(seed) env = SubprocVecEnv([make_env(i) for i in range(num_cpu)]) if policy == 'cnn': policy_fn = CnnPolicy elif policy == 'lstm': policy_fn = LstmPolicy elif policy == 'lnlstm': policy_fn = LnLstmPolicy learn(policy_fn, env, seed, total_timesteps=int(num_timesteps * 1.1), lrschedule=lrschedule) env.close()
def train(env_id, num_timesteps, seed): import baselines.common.tf_util as U sess = U.single_threaded_session() sess.__enter__() rank = MPI.COMM_WORLD.Get_rank() if rank != 0: logger.set_level(logger.DISABLED) workerseed = seed + 10000 * MPI.COMM_WORLD.Get_rank() set_global_seeds(workerseed) env = gym.make(env_id) def policy_fn(name, ob_space, ac_space): return MlpPolicy(name=name, ob_space=env.observation_space, ac_space=env.action_space, hid_size=32, num_hid_layers=2) env = bench.Monitor(env, logger.get_dir() and osp.join(logger.get_dir(), str(rank))) env.seed(workerseed) gym.logger.setLevel(logging.WARN) trpo_mpi.learn(env, policy_fn, timesteps_per_batch=1024, max_kl=0.01, cg_iters=10, cg_damping=0.1, max_timesteps=num_timesteps, gamma=0.99, lam=0.98, vf_iters=5, vf_stepsize=1e-3) env.close()
def train(env_id, num_timesteps, seed): from baselines.ppo1 import mlp_policy, pposgd_simple U.make_session(num_cpu=1).__enter__() set_global_seeds(seed) env = gym.make(env_id) def policy_fn(name, ob_space, ac_space): return mlp_policy.MlpPolicy(name=name, ob_space=ob_space, ac_space=ac_space, hid_size=64, num_hid_layers=2) env = bench.Monitor(env, logger.get_dir()) env.seed(seed) gym.logger.setLevel(logging.WARN) pposgd_simple.learn(env, policy_fn, max_timesteps=num_timesteps, timesteps_per_actorbatch=2048, clip_param=0.2, entcoeff=0.0, optim_epochs=10, optim_stepsize=3e-4, optim_batchsize=64, gamma=0.99, lam=0.95, schedule='linear', ) env.close()
def train(env_id, num_timesteps, seed): from baselines.pposgd import mlp_policy, pposgd_simple U.make_session(num_cpu=1).__enter__() logger.session().__enter__() set_global_seeds(seed) env = gym.make(env_id) def policy_fn(name, ob_space, ac_space): return mlp_policy.MlpPolicy(name=name, ob_space=ob_space, ac_space=ac_space, hid_size=64, num_hid_layers=2) env = bench.Monitor(env, osp.join(logger.get_dir(), "monitor.json")) env.seed(seed) gym.logger.setLevel(logging.WARN) pposgd_simple.learn(env, policy_fn, max_timesteps=num_timesteps, timesteps_per_batch=2048, clip_param=0.2, entcoeff=0.0, optim_epochs=10, optim_stepsize=3e-4, optim_batchsize=64, gamma=0.99, lam=0.95, ) env.close()
def init_logger(logfile, verbose=False): """Initialize the logger.""" global stdout_logger, file_logger, logger stdout_logger_lvl = logging.DEBUG if verbose else logging.INFO stdout_logger = logging.StreamHandler(sys.stdout) stdout_logger.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) stdout_logger.setLevel(stdout_logger_lvl) file_logger = logging.FileHandler(logfile) file_logger.setLevel(logging.DEBUG) file_logger.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(stdout_logger) logger.addHandler(file_logger) # Toggle output level for stdout logger (stdout_logger_lvl or logging.WARN)
def parse_args(args): parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group() group.add_argument('-v', '--verbose', help='Verbose (debug) logging', action='store_const', const=logging.DEBUG, dest='loglevel') group.add_argument('-q', '--quiet', help='Silent mode, only log warnings', action='store_const', const=logging.WARN, dest='loglevel') parser.add_argument("-s", '--skip', action='store_true', dest='skip_errors', help="Do not stop if one steps is failed") parser.add_argument("-e", '--callback', help='Callback for backup file (backup path ' 'passed as a 1st arg)') parser.add_argument( 'backup_dir', help="Destination for all created files") parser.set_defaults(func=do_node_backup) return parser.parse_args(args)
def single_help(bot, cmd, cmd_name) -> Embed: """ Generate help embed for a given embed. :return: the embed object for the given command. """ doc = cmd.help try: help_dict = safe_load(doc) except (YAMLError, AttributeError) as e: bot.logger.log(WARN, str(e)) return Embed(colour=bot.colour, description=doc) else: embed = Embed( colour=bot.colour, description=help_dict.pop('Description') ) embed.set_author(name=cmd_name, icon_url=bot.user.avatar_url) if cmd.aliases: embed.add_field(name='Aliases', value=f'`{", ".join(cmd.aliases)}`') for key, val in help_dict.items(): try: val = val.format(prefix=bot.prefix) except KeyError: val = val.replace('{prefix}', bot.prefix) embed.add_field(name=key, value=val, inline=False) return embed
def generate_report(args): verbose_to_log = { 0: logging.CRITICAL, 1: logging.ERROR, 2: logging.WARN, 3: logging.INFO, 4: logging.DEBUG } logging_level = logging.DEBUG if args.verbose > 4 else verbose_to_log[args.verbose] log.setLevel(logging_level) log.debug("args: %s" % args) args.output_state_results = True if args.verbose > 1 else args.output_state_results if args.job_group_urls: root_url = urljoin('/'.join(args.job_group_urls.split("/")[0:3]), '/') else: root_url = urljoin(args.host, '/') browser = Browser(args, root_url) job_groups = get_job_groups(browser, root_url, args) assert not (args.builds and len(job_groups) > 1), "builds option and multiple job groups not supported" assert len(job_groups) > 0, "No job groups were found, maybe misspecified '--job-groups'?" return Report(browser, args, root_url, job_groups)
def create_console_handler(verbose_level): clog = logging.StreamHandler() formatter = ColoredFormatter( "%(log_color)s[%(asctime)s %(levelname)-8s%(module)s]%(reset)s " "%(white)s%(message)s", datefmt="%H:%M:%S", reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red', }) clog.setFormatter(formatter) if verbose_level == 0: clog.setLevel(logging.WARN) elif verbose_level == 1: clog.setLevel(logging.INFO) else: clog.setLevel(logging.DEBUG) return clog
def set_log(level, filename='spider.log'): """ return a log file object ??????log?? """ if not os.path.isdir(LOG_DIR): os.mkdir(LOG_DIR) log_file = os.path.join(LOG_DIR, filename) if not os.path.isfile(log_file): os.mknod(log_file) os.chmod(log_file, 0777) log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR, 'critical': logging.CRITICAL} logger_f = logging.getLogger('spider') logger_f.setLevel(logging.DEBUG) fh = logging.FileHandler(log_file,'a') fh.setLevel(log_level_total.get(level, logging.DEBUG)) formatter = logging.Formatter('%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s') fh.setFormatter(formatter) logger_f.addHandler(fh) keep_fds = [fh.stream.fileno()] return logger_f,keep_fds
def test_set_default_log_level(self, mock_connect): import logging from datastore import get_logger from logging.handlers import RotatingFileHandler from datastore.postgresstore import PostgresLogHandler self.dsb.add_file_db("config-example.json", logging.CRITICAL) self.dsb.add_postgres_db("", logging.WARN) self.dsb.set_default_log_level(logging.INFO) self.assertEqual(DataStore.LOG_LEVEL, logging.INFO) logger = get_logger() fdbh = None pdbh = None for handler in logger.handlers: if isinstance(handler, RotatingFileHandler): fdbh = handler if isinstance(handler, PostgresLogHandler): pdbh = handler self.assertEqual(fdbh.level, logging.CRITICAL) self.assertEqual(pdbh.level, logging.WARNING)
def emit(self, record): message = self.format(record) logger_name = record.name if record.name == "root": logger_name = LogManager.ROOT_LOGGER_NAME logger = LogManager.getLogger(logger_name) level = record.levelno if level == logging.DEBUG: logger.debug(message) elif level == logging.INFO: logger.info(message) elif level == logging.WARN: logger.warn(message) elif level == logging.ERROR: logger.error(message) elif level == logging.CRITICAL: logger.fatal(message) else: logger.fatal("unknown logger level: " + str(level))
def configure_logging(verbosity): """Reconfigure logging with selected verbosity Sets the root logger and updates the args so oauth logging will also be configured properly Args: args (Object): Application args verbosity (int): The logging veribisty... 0: WARN 1: INFO >1: DEBUG """ if verbosity <= 0: level = logging.WARN elif verbosity == 1: level = logging.INFO else: assert verbosity > 1 level = logging.DEBUG logging.basicConfig(level=level)
def DEFAULT_LOGGING_CONFIG(level=logging.WARN, format=LOG_FORMAT): """Returns a default logging config in dict format. Compatible with logging.config.dictConfig(), this default set the root logger to `level` with `sys.stdout` console handler using a formatter initialized with `format`. A simple 'brief' formatter is defined that shows only the message portion any log entries.""" return { "version": 1, "formatters": {"generic": {"format": format}, "brief": {"format": "%(message)s"}, }, "handlers": {"console": {"class": "logging.StreamHandler", "level": "NOTSET", "formatter": "generic", "stream": "ext://sys.stdout", }, }, "root": {"level": level, "handlers": ["console"], }, "loggers": {}, }
def printDebug(message, priority=logging.WARN): """ Logs message given the priority specified arguments: message - the string message to be logged priority - the integer priority of the message; uses the priority levels in the logging module returns: nothing """ # this function (and hence the library) originally did not use the logging module from the standard library global sparki_logger # for compatibility, we will recognize the "old" priority levels, but new code should be written to conform to the # priority levels in the logging module if priority == DEBUG_DEBUG or priority == logging.DEBUG: sparki_logger.debug(message) elif priority == DEBUG_INFO or priority == logging.INFO: sparki_logger.info(message) elif priority == DEBUG_WARN or priority == logging.WARN: sparki_logger.warn(message) elif priority == DEBUG_ERROR or priority == logging.ERROR: sparki_logger.error(message) else: sparki_logger.critical(message)
def setup_logging(args): """Setup the logging level and configure the basic logger """ if args.verbose == 1: level = logging.INFO elif args.verbose >= 2: level = logging.DEBUG else: level = logging.WARN logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", level=level, ) global LOG LOG = logging.getLogger(__name__)