我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.config()。
def init(self): import DataStore, readconf, logging, sys self.conf.update({ "debug": None, "logging": None }) self.conf.update(DataStore.CONFIG_DEFAULTS) args, argv = readconf.parse_argv(self.argv, self.conf, strict=False) if argv and argv[0] in ('-h', '--help'): print self.usage() return None, [] logging.basicConfig( stream=sys.stdout, level=logging.DEBUG, format="%(message)s") if args.logging is not None: import logging.config as logging_config logging_config.dictConfig(args.logging) store = DataStore.new(args) return store, argv # Abstract hex-binary conversions for eventual porting to Python 3.
def run_migrations_offline(): """Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. """ url = config.get_main_option("sqlalchemy.url") context.configure(url=url) with context.begin_transaction(): context.run_migrations()
def configure_logging(debug=False, verbose=True, stderr=True): config = copy.deepcopy(LOG_CONFIG) for handler in config["handlers"].values(): if verbose: handler["level"] = "INFO" if debug: handler["level"] = "DEBUG" if verbose: config["handlers"]["stderr"]["formatter"] = "verbose" if debug: config["handlers"]["stderr"]["formatter"] = "debug" if stderr: config["loggers"][LOG_NAMESPACE]["handlers"].append("stderr") logging.config.dictConfig(config)
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'): path = default_path self.logconf = None value = os.getenv(env_key, None) if value: path = value if os.path.exists(path): with open(path, 'rt') as f: config = json.load(f) self.logconf = logging.config.dictConfig(config) elif os.path.exists(path.replace("../", "")): with open(path.replace("../", ""), 'rt') as f: config = json.load(f) self._changePath(config["handlers"]) self.logconf = logging.config.dictConfig(config) else: print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path) self.logconf = logging.basicConfig(level=default_level)
def _get_config(key, default_value=None, required=False): """ Gets config from environment variables Will return default_value if key is not in environment variables :param key: the key of the env variable you are looking for :param default_value: value to return if key not in os.environ. :param required: if true and key is not set, will raise InvalidConfigException :return: os.environ[key] if key in os.environ els default_value :exception InvalidConfigException - raised when a required config key is not properly set """ if required and key not in os.environ: raise InvalidConfigException("Invalid ENV variable. Please check {0}".format(key)) to_return = os.environ.get(key, default_value) if isinstance(to_return, basestring): try: to_return = _string_to_bool(to_return) except NonBooleanStringException: pass os.environ[key] = str(to_return) return to_return
def setup_logging( default_path='logging.ini', default_level=logging.INFO, env_key='LOG_CFG' ): """Setup logging configuration """ path = default_path value = os.getenv(env_key, None) if value: path = value if os.path.exists(path): logging.config.fileConfig(default_path) else: logging.basicConfig(level=default_level)
def configure_logging(logging_config, logging_settings): if not sys.warnoptions: # Route warnings through python logging logging.captureWarnings(True) # RemovedInNextVersionWarning is a subclass of DeprecationWarning which # is hidden by default, hence we force the "default" behavior warnings.simplefilter("default", RemovedInNextVersionWarning) if logging_config: # First find the logging configuration function ... logging_config_func = import_string(logging_config) logging.config.dictConfig(DEFAULT_LOGGING) # ... then invoke it with the logging settings if logging_settings: logging_config_func(logging_settings)
def _applyConfigurationToValues(self, parser, config, values): for name, value, filename in config: if name in option_blacklist: continue try: self._processConfigValue(name, value, values, parser) except NoSuchOptionError, exc: self._file_error( "Error reading config file %r: " "no such option %r" % (filename, exc.name), name=name, filename=filename) except optparse.OptionValueError, exc: msg = str(exc).replace('--' + name, repr(name), 1) self._file_error("Error reading config file %r: " "%s" % (filename, msg), name=name, filename=filename)
def process_alert(self, route): recipients = list(set(route["recipient_ids"])) logging.debug("Processing alert for {}, recipients {}".format( str(route), str(recipients) )) if "*" in self.data: recipients.append("*") for recipient_id in recipients: if not recipient_id in self.data: continue recipient = self.data[recipient_id] if len(recipient["routes"]) < recipient["config"]["max_routes"]: recipient["routes"].append(route) else: logging.debug("Discarding route {} for {}: buffer full ".format( route["prefix"], recipient_id ))
def _flush_recipient(self, recipient): if not isinstance(recipient["config"]["info"]["email"], list): email_addresses = [recipient["config"]["info"]["email"]] else: email_addresses = list(set(recipient["config"]["info"]["email"])) logging.info("Sending email to {} ({}) for {}".format( recipient["id"], ", ".join(email_addresses), ", ".join([route["prefix"] for route in recipient["routes"]]) )) data = { "id": recipient["id"], "from_addr": self.from_addr, "subject": self.subject, "routes_list": self._format_list_of_routes(recipient["routes"]) } msg = MIMEText(self.template.format(**data)) msg['Subject'] = self.subject msg['From'] = self.from_addr msg['To'] = ", ".join(email_addresses) self._send_email(self.from_addr, email_addresses, msg.as_string())
def setup_logging(verbosity_level, save_debug_log): logging.captureWarnings(True) # if config['logging']['config_file']: # # Logging config from file must be read before other handlers are # # added. If not, the other handlers will have no effect. # try: # path = config['logging']['config_file'] # logging.config.fileConfig(path, disable_existing_loggers=False) # except Exception as e: # # Catch everything as logging does not specify what can go wrong. # logger.error('Loading logging config %r failed. %s', path, e) setup_console_logging(verbosity_level) if save_debug_log: print('Here we would call setup_debug_logging_to_file(config)') # setup_debug_logging_to_file(config) _delayed_handler.release()
def setup_logger(): from colorlog import ColoredFormatter from gettext import gettext as _ # noqa try: """Return a logging obj with a default ColoredFormatter.""" formatter = ColoredFormatter( "%(asctime)s %(name)-12s (%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s (%(funcName)-5s) %(message_log_color)s%(message)s", # noqa datefmt=None, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', 'TRACE': 'purple' }, secondary_log_colors={ 'message': { 'ERROR': 'red', 'CRITICAL': 'red', 'DEBUG': 'yellow', 'INFO': 'yellow,bg_blue' } }, style='%' ) handler = logging.StreamHandler() handler.setFormatter(formatter) logging.getLogger('').addHandler(handler) logging.root.setLevel(logging.DEBUG) except ImportError: # No color available, use default config logging.basicConfig(format='%(levelname)s: %(message)s') logging.warn("Disabling color, you really want to install colorlog.")
def setup_logging(default_path='logging.json', default_level=logging.INFO, env_key='LOG_CFG'): """Setup logging configuration """ path = default_path value = os.getenv(env_key, None) if value: path = value if os.path.exists(path): with open(path, 'rt') as f: config = json.load(f) logging.config.dictConfig(config) else: logging.basicConfig(level=default_level) socketHandler = logging.handlers.DatagramHandler( 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT) rootLogger = logging.getLogger('') rootLogger.addHandler(socketHandler)
def main(argv): """ MySQL binlog to Google Pub/Sub entry point Args: argv (list): list of command line arguments """ args = _setup_arg_parser(argv) conf_file = args.conf if conf_file: os.environ['BINLOG2GPUBSUB_CONF_FILE'] = conf_file if args.logconf: logging.config.fileConfig(args.logconf, disable_existing_loggers=False) else: logging.basicConfig() if args.loglevel: logging.root.setLevel(logging.getLevelName(args.loglevel.upper())) import mysqlbinlog2gpubsub mysqlbinlog2gpubsub.start_publishing()
def extract_batch(dataset, config): with tf.device("/cpu:0"): bboxer = PriorBoxGrid(config) data_provider = slim.dataset_data_provider.DatasetDataProvider( dataset, num_readers=2, common_queue_capacity=512, common_queue_min=32) if args.segment: im, bbox, gt, seg = data_provider.get(['image', 'object/bbox', 'object/label', 'image/segmentation']) else: im, bbox, gt = data_provider.get(['image', 'object/bbox', 'object/label']) seg = tf.expand_dims(tf.zeros(tf.shape(im)[:2]), 2) im = tf.to_float(im)/255 bbox = yxyx_to_xywh(tf.clip_by_value(bbox, 0.0, 1.0)) im, bbox, gt, seg = data_augmentation(im, bbox, gt, seg, config) inds, cats, refine = bboxer.encode_gt_tf(bbox, gt) return tf.train.shuffle_batch([im, inds, refine, cats, seg], args.batch_size, 2048, 64, num_threads=4)
def __init__(self, pathname, **settings): """initial config for singleton baka framework :param import_name: the name of the application package :param settings: *optional dict settings for pyramid configuration """ self.import_name = pathname self.settings = settings self.__include = {} self.__trafaret = trafaret_yaml # Only set up a default log handler if the # end-user application didn't set anything up. if not (logging.root.handlers and log.level == logging.NOTSET and settings.get('LOGGING')): formatter = logging.Formatter(logging_format) handler = logging.StreamHandler() handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(logging.INFO)
def resource(self, path, **kwargs): def decorator(wrapped, depth=1): route_name = kwargs.pop("route_name", None) route_name = route_name or wrapped.__name__ route_name = kwargs.pop("name", route_name) wrapped.route_name = route_name def callback(scanner, name, cls): config = scanner.config.with_package(info.module) config.add_route(route_name, path, factory=cls) config.add_view(default_options_view, route_name=route_name, request_method='OPTIONS', permission=NO_PERMISSION_REQUIRED) config.add_view(unsupported_method_view, route_name=route_name, renderer='json') for method in METHODS: setattr(wrapped, method, type('ViewDecorator%s' % method, (ViewDecorator, object), {'request_method': method, 'state': wrapped, 'kwargs': kwargs})) info = venusian.attach(wrapped, callback, 'pyramid', depth=depth) return wrapped return decorator
def run(self, host=None, port=None, **options): """ application runner server for development stage. not for production. :param host: url host application server :param port: number of port :param options: dict options for werkzeug wsgi server """ settings = self.config.get_settings() _host = '127.0.0.1' _port = 5000 host = host or _host port = int(port or _port) options.setdefault('use_reloader', settings.get('debug_all')) options.setdefault('use_debugger', settings.get('debug_all')) from werkzeug.serving import run_simple run_simple(host, port, self.config.make_wsgi_app(), **options)
def _logging_config(config_parser, disable_existing_loggers=False): """ Helper that allows us to use an existing ConfigParser object to load logging configurations instead of a filename. Note: this code is essentially copy pasta from `logging.config.fileConfig` except we skip loading the file. """ formatters = logging.config._create_formatters(config_parser) # critical section logging._acquireLock() try: logging._handlers.clear() del logging._handlerList[:] # Handlers add themselves to logging._handlers handlers = logging.config._install_handlers(config_parser, formatters) logging.config._install_loggers(config_parser, handlers, disable_existing_loggers) finally: logging._releaseLock()
def main(args=None): args = get_parser_args(args) _logging_config(args.config) logger = logging.getLogger(__name__) logger.info("Loading configurations") slackbot_config = resources.SlackBotConfig.from_config(args.config) # Since we can't inject the settings into the bot, let's load all the settings # into the module slackbot_config.load_into_settings_module(slackbot.settings) # Load the config into the settings... # TODO: PR to be able to inject settings instead of auto magically loading them from a module slackbot.settings.SLACK_JIRA_CONF = args.config logger.info("Starting slackbot") bot = slackbot.bot.Bot() bot.run()
def my_log(): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[logging.FileHandler('message.log', 'a', 'utf-8')]) # ?????_?????__ _log = logging.getLogger('app.' + __name__) host = '10.0.0.175' port = 8080 # ??? 'xxxx' % (aa, bb)???????? _log.error('error to connect to %s:%d', host, port) _log.addFilter(FilterFunc('foo')) # ?????foo()??????? lgg = logging.getLogger('app.network.client') lgg.propagate = False # ?????? lgg.error('do you see me?') # ???????? lgg.setLevel(logging.CRITICAL) lgg.error('now you see me?') logging.disable(logging.DEBUG) # ???????? # ??log??????main????????? config.fileConfig('applogcfg.ini')
def setup_logging(config_file_path, log_level=logging.INFO): """ Logging configuration helper. :param config_file_path: file path to logging configuration file. https://docs.python.org/3/library/logging.config.html#object-connections :param log_level: defaults to logging.INFO :return: None - access the logger by name as described in the config--or the "root" logger as a backup. """ try: with open(config_file_path, 'rt') as f: config = json.load(f) logging.config.dictConfig(config) except (ValueError, IOError, OSError): # json.JSONDecodeError is throwable in Python3.5+ -- subclass of ValueError logging.basicConfig(log_level=log_level) logging.root.exception( "Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: " "[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
def _load_config(): # Fills the global CONFIG dictionary using default and custom config # Returns an error if the custom config is invalid global CONFIG try: cfg = _load_default_config() custom_cfg = _load_custom_config() if custom_cfg: CONFIG = _merge(cfg, custom_cfg) else: CONFIG = cfg except yaml.YAMLError as exc: # Try to point to the line that threw an error if hasattr(exc, 'problem_mark'): mark = exc.problem_mark return 'Error in YAML at position: ({}:{})'.format(mark.line + 1, mark.column + 1)
def get(entity, param): """ Returns the configuration value belonging to a specified entity (e.g. neo4j) and parameter (e.g. host). :param entity: The configuration entity :param param: The configuration parameter :return: The configuration value :raises ValueError if a requested parameter is not configured """ try: value = _get_config()[entity][param] LOGGER.debug('Found config: {}:{} => {}'.format(entity, param, value)) return _get_config()[entity][param] except KeyError: # Should _never_ happen in production! msg = 'Parameter {} is not present for entity {}!'.format(param, entity) LOGGER.critical(msg) raise ValueError(msg)
def _setup_requests(app): def _init_request(): session = request.environ['beaker.session'] session.save() _setup_connector( app=current_app, app_config=current_app.config, session=session ) @app.before_request def before_request(): init_request = _init_request() return init_request
def make_web_app(): logging.config.dictConfig(config.LOGGING_CONFIG) settings = { 'debug': constants.DEBUG, 'template_path': os.path.join( os.path.dirname(__file__), "web", "templates" ), 'static_path': os.path.join( os.path.dirname(__file__), 'web', 'static' ), 'default_handler_class ': BaseHandler } app = tornado.web.Application(url_handlers, **settings) return app