我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用logging.config.dictConfig()。
def init(self): import DataStore, readconf, logging, sys self.conf.update({ "debug": None, "logging": None }) self.conf.update(DataStore.CONFIG_DEFAULTS) args, argv = readconf.parse_argv(self.argv, self.conf, strict=False) if argv and argv[0] in ('-h', '--help'): print self.usage() return None, [] logging.basicConfig( stream=sys.stdout, level=logging.DEBUG, format="%(message)s") if args.logging is not None: import logging.config as logging_config logging_config.dictConfig(args.logging) store = DataStore.new(args) return store, argv # Abstract hex-binary conversions for eventual porting to Python 3.
def setup_logging(filename): """Setup logging based on a json string. :type filename: str :param filename: Log configuration file. :raises: IOError -- If the file does not exist. :raises: ValueError -- If the file is an invalid json file. """ try: config = parse_json_file(filename) log_conf.dictConfig(config) logpy4j = logging.getLogger("py4j") logpy4j.setLevel(logging.ERROR) logkafka = logging.getLogger("kafka") logkafka.setLevel(logging.ERROR) except (IOError, ValueError): raise
def test_dict_config(self): dictConfig(DICT_CONFIG) logger = logging.getLogger('dict_config') self.rm.post('https://my-super-hooks.com/12345', text='ok') logger.info("Test without filter dictConfig") self.assertEqual(self.rm.call_count, 0) logger.info("Test with dictConfig", extra={'notify_slack': True}) self.assertEqual(self.rm.call_count, 1) self._assert_has_attachment("Test with dictConfig", "good")
def run(): """Start wechat client.""" config.dictConfig(LOGGING) client_log = getLogger('client') session = Session() client = SyncClient(session) sync_session(client) client_log.info('process down...')
def run(**kwargs): """Start wechat client.""" input_queue = queue.Queue() msg_queue = queue.Queue() login_event = threading.Event() exit_event = threading.Event() config.dictConfig(LOGGING) client_log = getLogger('client') session = Session() client = SyncClient(session) session_thread = threading.Thread( target=sync_session, args=(client, input_queue, login_event, exit_event)) reply_thread = threading.Thread( target=reply_message, args=(client, msg_queue, login_event, exit_event)) session_thread.start() reply_thread.start() show_input_message(client, input_queue, msg_queue, exit_event) session_thread.join() reply_thread.join() client_log.info('process down...')
def enable_debug(to_file=True, filename='KubeObjHelper.log', reset_logfile=True): logger_config = { 'version': 1, 'level': 'DEBUG', 'propogate': False, 'loggers': { 'openshift.helper': { 'handlers': ['debug_logger'], 'level': 'DEBUG', 'propagate': False } } } if to_file: mode = 'w' if reset_logfile else 'a' logger_config['handlers'] = { 'debug_logger': { 'class': 'logging.FileHandler', 'level': 'DEBUG', 'filename': filename, 'mode': mode, 'encoding': 'utf-8' } } else: logger_config['handlers'] = { 'debug_logger': { 'class': 'logging.StreamHandler', 'level': 'DEBUG' } } logging_config.dictConfig(logger_config)
def commandline(): """ Entrypoint for openshift-ansible-gen :return: None """ parser = argparse.ArgumentParser(description=u'Uses the OpenShift API models to generate Ansible artifacts') parser.add_argument('--debug', action='store_true', dest='debug', help=u'enable debug output', default=False) subparsers = parser.add_subparsers(title='subcommand', dest='subcommand') subparsers.required = True for subcommand in AVAILABLE_COMMANDS: if globals().get('subcmd_%s_parser' % subcommand): subparser = subparsers.add_parser(subcommand, help=AVAILABLE_COMMANDS[subcommand]) globals()['subcmd_%s_parser' % subcommand](parser, subparser) args = parser.parse_args() if args.debug: # enable debug output LOGGING['loggers']['openshift.ansiblegen']['level'] = 'DEBUG' config.dictConfig(LOGGING) if args.subcommand == 'help': parser.print_help() sys.exit(0) elif args.subcommand == 'version': logger.info("{0} version is {1}".format(__name__, __version__)) sys.exit(0) elif args.subcommand == 'modules': if args.suppress_stdout: # disable output LOGGING['loggers']['openshift.ansiblegen']['level'] = 'CRITICAL' try: globals()['run_{}_cmd'.format(args.subcommand)](**vars(args)) except Exception: raise sys.exit(0)
def setup_logging(disable_existing_loggers=None): """Setup global logging. This uses the loaded config settings to set up the logging. Args: disable_existing_loggers (boolean): If we would like to disable the existing loggers when creating this one. None means use the default from the config, True and False overwrite the config. """ conf = get_logging_configuration_dict() if disable_existing_loggers is not None: conf['disable_existing_loggers'] = True logging_config.dictConfig(conf)
def reset_logging(): """Reset the logging to reflect the current configuration. This is commonly called after updating the logging configuration to let the changes take affect. """ logging_config.dictConfig(get_logging_configuration_dict())
def configure_logging(self): if isinstance(self.LOGGING, str): config.fileConfig(self.LOGGING) elif isinstance(self.LOGGING, dict): config.dictConfig(self.LOGGING) elif isinstance(self.LOGGING, type(None)): # Disable all logging if not _FORCE_PREVENT_LOGGING_DISABLE: logging.disable(logging.CRITICAL) else: raise ConfigurationError('Invalid LOGGING: must be string, dict')
def start_loggers(): global logger if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) config.dictConfig(logging_config) logger = logging.getLogger('clusterfuzz') # Force rolling a log file; each log file represents a single run. for handler in logger.handlers: if isinstance(handler, logging.handlers.RotatingFileHandler): handler.doRollover()
def setup_logging(filename): """Setup logging based on a json string.""" with open(filename, "rt") as f: config = json.load(f) log_conf.dictConfig(config)
def setup_logging(): current_dir = os.path.dirname(__file__) logging_config_file = os.path.join(current_dir, DEFAULT_LOGGING_CONFIG_FILE) with open(logging_config_file, "rt") as f: config = json.load(f) log_conf.dictConfig(config)
def load_yaml(config_path): with open(config_path, 'r') as f: y = yaml.load(f) lconfig.dictConfig(y)
def logger(): log_conf.dictConfig(log_config) return logging.getLogger(config.LOG_LEVEL)
def run(self): """Run command.""" from container.cli import LOGGING from logging import config from container import core if self.debug: LOGGING['loggers']['container']['level'] = 'DEBUG' config.dictConfig(LOGGING) core.hostcmd_prebake(self.distros, debug=self.debug, cache=self.cache, ignore_errors=self.ignore_errors)
def conductor_commandline(): sys.stderr.write('Parsing conductor CLI args.\n') parser = argparse.ArgumentParser(description=u'This should not be invoked ' u'except in a container by ' u'Ansible Container.') parser.add_argument('command', action='store', help=u'Command to run.', choices=['build', 'deploy', 'install', 'push', 'run', 'restart', 'stop', 'destroy']) parser.add_argument('--project-name', action='store', help=u'Project name.', required=True) parser.add_argument('--engine', action='store', help=u'Engine name.', required=True) parser.add_argument('--params', action='store', required=False, help=u'Encoded parameters for command.') parser.add_argument('--config', action='store', required=True, help=u'Encoded Ansible Container config.') parser.add_argument('--encoding', action='store', choices=['b64json'], help=u'Encoding used for parameters.', default='b64json') args = parser.parse_args() decoding_fn = globals()['decode_%s' % args.encoding] if args.params: params = decoding_fn(args.params) else: params = {} if params.get('debug'): LOGGING['loggers']['container']['level'] = 'DEBUG' config.dictConfig(LOGGING) containers_config = decoding_fn(args.config) conductor_config = AnsibleContainerConductorConfig(list_to_ordereddict(containers_config)) logger.debug('Starting Ansible Container Conductor: %s', args.command, services=conductor_config.services) getattr(core, 'conductorcmd_%s' % args.command)( args.engine, args.project_name, conductor_config.services, volume_data=conductor_config.volumes, repository_data=conductor_config.registries, secrets=conductor_config.secrets, **params)
def set_logging_config(config, debug, verbosity, uncaught_logger, uncaught_handler): # configure logging globally import logging.config as logconfig logconfig.dictConfig(config) # make sure we log any warnings log.captureWarnings(True) import warnings categories = (DeprecationWarning, PendingDeprecationWarning) if verbosity > 2: warnings.simplefilter("always") elif debug or verbosity > 0: for category in categories: warnings.simplefilter("always", category=category) # make sure we also log any uncaught exceptions if uncaught_logger is None: logger = log.getLogger(__name__) else: logger = log.getLogger(uncaught_logger) if uncaught_handler is None: def exception_logger(exc_type, exc_value, exc_tb): logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_tb)) uncaught_handler = exception_logger sys.excepthook = uncaught_handler return logger
def main(argv): if argv[0] == '--show-policy': for policy in argv[1:] or list_policies(): show_policy(policy) return 0 elif argv[0] == '--list-policies': print("Available chain policies:") for name in list_policies(): print(" %s" % name) return 0 args, argv = readconf.parse_argv(argv, create_conf()) if not argv: pass elif argv[0] in ('-h', '--help'): print ("""Usage: python -m Abe.abe [-h] [--config=FILE] [--CONFIGVAR=VALUE]... A Bitcoin block chain browser. --help Show this help message and exit. --version Show the program version and exit. --print-htdocs-directory Show the static content directory name and exit. --list-policies Show the available policy names for --datadir. --show-policy POLICY... Describe the given policy. --query /q/COMMAND Show the given URI content and exit. --config FILE Read options from FILE. All configuration variables may be given as command arguments. See abe.conf for commented examples.""") return 0 elif argv[0] in ('-v', '--version'): print ABE_APPNAME, ABE_VERSION print "Schema version", DataStore.SCHEMA_VERSION return 0 elif argv[0] == '--print-htdocs-directory': print find_htdocs() return 0 else: sys.stderr.write( "Error: unknown option `%s'\n" "See `python -m Abe.abe --help' for more information.\n" % (argv[0],)) return 1 logging.basicConfig( stream=sys.stdout, level = logging.DEBUG if args.query is None else logging.ERROR, format=DEFAULT_LOG_FORMAT) if args.logging is not None: import logging.config as logging_config logging_config.dictConfig(args.logging) if args.auto_agpl: import tarfile store = make_store(args) if (not args.no_serve): serve(store) return 0