我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用logging.DEBUG。
def create_logger(): """ Setup the logging environment """ log = logging.getLogger() # root logger log.setLevel(logging.INFO) format_str = '%(asctime)s - %(levelname)-8s - %(message)s' date_format = '%Y-%m-%d %H:%M:%S' if HAVE_COLORLOG and os.isatty(2): cformat = '%(log_color)s' + format_str colors = {'DEBUG': 'reset', 'INFO': 'reset', 'WARNING': 'bold_yellow', 'ERROR': 'bold_red', 'CRITICAL': 'bold_red'} formatter = colorlog.ColoredFormatter(cformat, date_format, log_colors=colors) else: formatter = logging.Formatter(format_str, date_format) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) log.addHandler(stream_handler) return logging.getLogger(__name__)
def init_logger(self, args): level = logging.INFO if args.verbose: level = logging.VERBOSE if args.debug: level = logging.DEBUG logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=level) Rthandler = RotatingFileHandler('arbitrage.log', maxBytes=100*1024*1024,backupCount=10) Rthandler.setLevel(level) formatter = logging.Formatter('%(asctime)-12s [%(levelname)s] %(message)s') Rthandler.setFormatter(formatter) logging.getLogger('').addHandler(Rthandler) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
def init(self): import DataStore, readconf, logging, sys self.conf.update({ "debug": None, "logging": None }) self.conf.update(DataStore.CONFIG_DEFAULTS) args, argv = readconf.parse_argv(self.argv, self.conf, strict=False) if argv and argv[0] in ('-h', '--help'): print self.usage() return None, [] logging.basicConfig( stream=sys.stdout, level=logging.DEBUG, format="%(message)s") if args.logging is not None: import logging.config as logging_config logging_config.dictConfig(args.logging) store = DataStore.new(args) return store, argv # Abstract hex-binary conversions for eventual porting to Python 3.
def setup_logging(log_level=logging.INFO): """Set up the logging.""" logging.basicConfig(level=log_level) fmt = ("%(asctime)s %(levelname)s (%(threadName)s) " "[%(name)s] %(message)s") colorfmt = "%(log_color)s{}%(reset)s".format(fmt) datefmt = '%Y-%m-%d %H:%M:%S' # Suppress overly verbose logs from libraries that aren't helpful logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('aiohttp.access').setLevel(logging.WARNING) try: from colorlog import ColoredFormatter logging.getLogger().handlers[0].setFormatter(ColoredFormatter( colorfmt, datefmt=datefmt, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red', } )) except ImportError: pass logger = logging.getLogger('') logger.setLevel(log_level)
def setup_logging(verbose=0, colors=False, name=None): """Configure console logging. Info and below go to stdout, others go to stderr. :param int verbose: Verbosity level. > 0 print debug statements. > 1 passed to sphinx-build. :param bool colors: Print color text in non-verbose mode. :param str name: Which logger name to set handlers to. Used for testing. """ root_logger = logging.getLogger(name) root_logger.setLevel(logging.DEBUG if verbose > 0 else logging.INFO) formatter = ColorFormatter(verbose > 0, colors) if colors: colorclass.Windows.enable() handler_stdout = logging.StreamHandler(sys.stdout) handler_stdout.setFormatter(formatter) handler_stdout.setLevel(logging.DEBUG) handler_stdout.addFilter(type('', (logging.Filter,), {'filter': staticmethod(lambda r: r.levelno <= logging.INFO)})) root_logger.addHandler(handler_stdout) handler_stderr = logging.StreamHandler(sys.stderr) handler_stderr.setFormatter(formatter) handler_stderr.setLevel(logging.WARNING) root_logger.addHandler(handler_stderr)
def kas(argv): """ The main entry point of kas. """ create_logger() parser = kas_get_argparser() args = parser.parse_args(argv) if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.info('%s %s started', os.path.basename(sys.argv[0]), __version__) loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, interruption) atexit.register(_atexit_handler) for plugin in getattr(kasplugin, 'plugins', []): if plugin().run(args): return parser.print_help()
def receiver_proc2(task=None): # if server is in remote network, add it explicitly # scheduler = pycos.Pycos.instance() # yield scheduler.peer('remote.ip') # yield scheduler.peer(pycos.Location('remote.ip', tcp_port)) rchannel = yield pycos.Channel.locate('2clients') # this task subscribes to the channel to get messages to server channel print('server is at %s' % rchannel.location) if (yield rchannel.subscribe(task)) != 0: raise Exception('subscription failed') sender = pycos.Task(sender_proc, rchannel) while True: msg = yield task.receive() print('Received "%s" from %s at %s' % (msg['msg'], msg['sender'].name, msg['sender'].location)) if msg['msg'] is None and msg['sender'] == sender: break yield rchannel.unsubscribe(task) # pycos.logger.setLevel(logging.DEBUG)
def init_logger(logger_name): # initialize logger log = logging.getLogger(logger_name) _h = logging.FileHandler('%s/%s' % ( cfg.CONF.service.service_log_path, cfg.CONF.service.service_log_filename)) _h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" "%(lineno)s - %(levelname)s" " - %(message)s'")) log.addHandler(_h) if cfg.CONF.service.enable_debug_log_entries: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO) return log
def configure_logging(self): """ Configure logging to log to std output as well as to log file """ log_level = logging.DEBUG log_filename = datetime.now().strftime('%Y-%m-%d') + '.log' sp_logger = logging.getLogger('sp_logger') sp_logger.setLevel(log_level) formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s') fh = logging.FileHandler(filename=self.log_dir + log_filename) fh.setLevel(log_level) fh.setFormatter(formatter) sp_logger.addHandler(fh) sh = logging.StreamHandler(sys.stdout) sh.setLevel(log_level) sh.setFormatter(formatter) sp_logger.addHandler(sh)
def main(): import argparse logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler(sys.stdout)) parser = argparse.ArgumentParser(description="Linux distro info tool") parser.add_argument( '--json', '-j', help="Output in machine readable format", action="store_true") args = parser.parse_args() if args.json: logger.info(json.dumps(info(), indent=4, sort_keys=True)) else: logger.info('Name: %s', name(pretty=True)) distribution_version = version(pretty=True) if distribution_version: logger.info('Version: %s', distribution_version) distribution_codename = codename() if distribution_codename: logger.info('Codename: %s', distribution_codename)
def add_stderr_logger(level=logging.DEBUG): """ Helper for quickly adding a StreamHandler to the logger. Useful for debugging. Returns the handler after adding it. """ # This method needs to be in this __init__.py to get the __name__ correct # even if urllib3 is vendored within another package. logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) logger.addHandler(handler) logger.setLevel(level) logger.debug('Added a stderr logging handler to logger: %s', __name__) return handler # ... Clean up.
def grad_sparsity(self): global_state = self._global_state if self._iter == 0: global_state["sparsity_avg"] = 0.0 non_zero_cnt = 0.0 all_entry_cnt = 0.0 for group in self._optimizer.param_groups: for p in group['params']: if p.grad is None: continue grad = p.grad.data grad_non_zero = grad.nonzero() if grad_non_zero.dim() > 0: non_zero_cnt += grad_non_zero.size()[0] all_entry_cnt += torch.numel(grad) beta = self._beta global_state["sparsity_avg"] = beta * global_state["sparsity_avg"] \ + (1 - beta) * non_zero_cnt / float(all_entry_cnt) self._sparsity_avg = \ global_state["sparsity_avg"] / self.zero_debias_factor() if DEBUG: logging.debug("sparsity %f, sparsity avg %f", non_zero_cnt / float(all_entry_cnt), self._sparsity_avg) return
def get_cubic_root(self): # We have the equation x^2 D^2 + (1-x)^4 * C / h_min^2 # where x = sqrt(mu). # We substitute x, which is sqrt(mu), with x = y + 1. # It gives y^3 + py = q # where p = (D^2 h_min^2)/(2*C) and q = -p. # We use the Vieta's substution to compute the root. # There is only one real solution y (which is in [0, 1] ). # http://mathworld.wolfram.com/VietasSubstitution.html # eps in the numerator is to prevent momentum = 1 in case of zero gradient p = (self._dist_to_opt + eps)**2 * (self._h_min + eps)**2 / 2 / (self._grad_var + eps) w3 = (-math.sqrt(p**2 + 4.0 / 27.0 * p**3) - p) / 2.0 w = math.copysign(1.0, w3) * math.pow(math.fabs(w3), 1.0/3.0) y = w - p / 3.0 / (w + eps) x = y + 1 if DEBUG: logging.debug("p %f, den %f", p, self._grad_var + eps) logging.debug("w3 %f ", w3) logging.debug("y %f, den %f", y, w + eps) return x
def main(): """ Simple private telegram bot example. """ # Set up logging to log to stdout import logging logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) updater = Updater(TOKEN) dispatcher = updater.dispatcher dispatcher.add_handler(CommandHandler("start", start_handler)) # Enable admin commands for this bot AdminCommands(dispatcher) updater.start_polling() updater.idle()
def hadleSslCommunication(self, deviceId, sslSocket): try: while True: payload = iotcommon.recvMessage(sslSocket) clientAddr = sslSocket.getpeername() self.logger.info("Received SSL payload from {0} at {1}:{2}: {3}".format(binascii.hexlify(deviceId), clientAddr[0], clientAddr[1], payload)) if deviceId in self.sessions: session = self.sessions[deviceId] else: self.logger.debug(" creating new session for SSL device: %s", binascii.hexlify(deviceId)) session = IotSession(deviceId, IotSession.TYPE_SSL) self.sessions[deviceId] = session session.lastUpdateTime = datetime.datetime.now() session.lastPayload = payload if self.logger.getEffectiveLevel() == logging.DEBUG: self.dumpSessions() self.passToHandler(deviceId, payload) except Exception as e: self.logger.exception(e) try: self.removeSession(deviceId) sslSocket.shutdown(socket.SHUT_RDWR) sslSocket.close() except: pass
def init_logging(logfile, debug=True, level=None): """ Simple configuration of logging. """ if debug: log_level = logging.DEBUG else: log_level = logging.INFO # allow user to override exact log_level if level: log_level = level logging.basicConfig(level=log_level, format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s', filename=logfile, filemode='a') return logging.getLogger("circus")
def ConvertLog4ToCFLevel( log4level ): if log4level == logging.FATAL+1 : return CF.LogLevels.OFF if log4level == logging.FATAL : return CF.LogLevels.FATAL if log4level == logging.ERROR : return CF.LogLevels.ERROR if log4level == logging.WARN : return CF.LogLevels.WARN if log4level == logging.INFO : return CF.LogLevels.INFO if log4level == logging.DEBUG : return CF.LogLevels.DEBUG if log4level == logging.TRACE : return CF.LogLevels.TRACE if log4level == logging.NOTSET: return CF.LogLevels.ALL return CF.LogLevels.INFO
def ConvertToLog4Level( newLevel ): level = logging.INFO if newLevel == CF.LogLevels.OFF : level=logging.FATAL+1 if newLevel == CF.LogLevels.FATAL : level=logging.FATAL if newLevel == CF.LogLevels.ERROR : level=logging.ERROR if newLevel == CF.LogLevels.WARN : level=logging.WARN if newLevel == CF.LogLevels.INFO: level=logging.INFO if newLevel == CF.LogLevels.DEBUG: level=logging.DEBUG if newLevel == CF.LogLevels.TRACE: level=logging.TRACE if newLevel == CF.LogLevels.ALL: level=logging.TRACE return level
def run(self, args=None, namespace=None): options = self.parser.parse_args(args=args, namespace=namespace) enable_pretty_logging() logger = logging.getLogger(__name__) # todo configure_logger() method ? if options.debug: logging.getLogger('root').setLevel(logging.INFO) if options.verbose: if options.verbose >= 1: logging.getLogger('root').setLevel(logging.DEBUG) if options.verbose >= 2: logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO if options.verbose < 2 else logging.DEBUG) try: handler = options.handler except AttributeError as e: if not callable(self.default_handler): raise handler = None return (handler or self.default_handler)(logger, options)
def __init__(self, appname, dllname=None, logtype="Application"): logging.Handler.__init__(self) try: import win32evtlogutil, win32evtlog self.appname = appname self._welu = win32evtlogutil if not dllname: dllname = os.path.split(self._welu.__file__) dllname = os.path.split(dllname[0]) dllname = os.path.join(dllname[0], r'win32service.pyd') self.dllname = dllname self.logtype = logtype self._welu.AddSourceToRegistry(appname, dllname, logtype) self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE self.typemap = { logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE, logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE, logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE, logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE, logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE, } except ImportError: print("The Python Win32 extensions for NT (service, event "\ "logging) appear not to be available.") self._welu = None
def main(): parser = build_cli_parser("Grab all binaries from a Cb server") parser.add_argument('-d', '--destdir', action='store', help='Destination directory to place the events', default=os.curdir) # TODO: we don't have a control on the "start" value in the query yet # parser.add_argument('--start', action='store', dest='startvalue', help='Start from result number', default=0) parser.add_argument('-v', action='store_true', dest='verbose', help='Enable verbose debugging messages', default=False) args = parser.parse_args() cb = get_cb_response_object(args) if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) # startvalue = args.startvalue startvalue = 0 return dump_all_binaries(cb, args.destdir, startvalue)
def startRPC(self, port, eventListenerPort): logging.basicConfig(filename='worldpay-within-wrapper.log', level=logging.DEBUG) reqOS = ["darwin", "win32", "windows", "linux"] reqArch = ["x64", "ia32"] cfg = launcher.Config(reqOS, reqArch) launcherLocal = launcher.launcher() # define log file name for rpc agent, so e.g # for "runConsumerOWP.py" it will be: "rpc-wpwithin-runConsumerOWP.log" logfilename = os.path.basename(sys.argv[0]) logfilename = "rpc-wpwithin-" + logfilename.rsplit(".", 1)[0] + ".log" args = [] if eventListenerPort > 0: logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info" + " -callbackport " + str(eventListenerPort)) args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info', '-callbackport', str(eventListenerPort)] else: logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info") args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info'] process = launcherLocal.launch(cfg, os.getcwd() + "", args) return process
def cli(ctx, registry, build_container_image, build_container_tag, build_container_net, verbose): """ Easily dockerize your Git repository """ logging_level = logging.DEBUG if verbose else logging.INFO utils.configure_logging(name='skipper', level=logging_level) ctx.obj['registry'] = registry ctx.obj['build_container_image'] = build_container_image ctx.obj['build_container_net'] = build_container_net ctx.obj['git_revision'] = build_container_tag == 'git:revision' ctx.obj['build_container_tag'] = git.get_hash() if ctx.obj['git_revision'] else build_container_tag ctx.obj['env'] = ctx.default_map.get('env', {}) ctx.obj['containers'] = ctx.default_map.get('containers') ctx.obj['volumes'] = ctx.default_map.get('volumes') ctx.obj['workdir'] = ctx.default_map.get('workdir') ctx.obj['container_context'] = ctx.default_map.get('container_context')
def AkamaiEdgeGridConfig_Setup(config_file, section): config_file = os.path.expanduser(config_file) if debug: print "DEBUG: config_file", config_file #Currently unused. required_options = ['client_token','client_secret','host','access_token'] EdgeGridConfig = {} if os.path.isfile(config_file): config = ConfigParser.ConfigParser() config.readfp(open(config_file)) for key, value in config.items(section): # ConfigParser lowercases magically EdgeGridConfig[key] = value else: print "Missing configuration file. Run python gen_creds.py to get your credentials file set up once you've provisioned credentials in LUNA." exit() EdgeGridConfig['host'] = '%s://%s' % ('https', EdgeGridConfig['host']) if debug: print EdgeGridConfig return EdgeGridConfig #Setup a EdgeGrid Session using the EdgeGridConfig previously loaded.
def log(self, message, level=logging.DEBUG, depth=0): """Prepend string to log messages to denote class.""" if depth <= 0: prefix = 'AmazonAccountUtils: ' else: prefix = "\t" * depth if level == CRITICAL: self.logger.critical(prefix + str(message)) elif level == ERROR: self.logger.error(prefix + str(message)) elif level == WARNING: self.logger.warning(prefix + str(message)) elif level == INFO: self.logger.info(prefix + str(message)) else: self.logger.debug(prefix + str(message))
def logger(level, name, logfile): """ Create and configure file and console logging. :param level: console debugging level only. :param name: logger name :param logfile: log destination file name :return: configured logging object """ logger = logging.getLogger(name) console_handler = logging.StreamHandler() console_handler.setLevel(level) file_handler = logging.FileHandler(logfile) file_handler.setLevel(logging.DEBUG) console_formatter = logging.Formatter("[%(levelname)s] %(message)s") file_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") console_handler.setFormatter(console_formatter) file_handler.setFormatter(file_formatter) logger.addHandler(console_handler) logger.addHandler(file_handler) return logger
def __init__(self, file=None, cff2GetGlyphOrder=None, topSize=0, isCFF2=None): assert (isCFF2 is None) == (file is None) self.cff2GetGlyphOrder = cff2GetGlyphOrder if file is not None and isCFF2: self._isCFF2 = isCFF2 self.items = [] name = self.__class__.__name__ log.log(DEBUG, "loading %s at %s", name, file.tell()) self.file = file count = 1 self.items = [None] * count self.offsets = [0, topSize] self.offsetBase = file.tell() # pretend we've read the whole lot file.seek(self.offsetBase + topSize) log.log(DEBUG, " end of %s at %s", name, file.tell()) else: super(TopDictIndex, self).__init__(file, isCFF2=isCFF2)
def read(self, parent, value): if value == 0: return "StandardEncoding" elif value == 1: return "ExpertEncoding" else: assert value > 1 file = parent.file file.seek(value) log.log(DEBUG, "loading Encoding at %s", value) fmt = readCard8(file) haveSupplement = fmt & 0x80 if haveSupplement: raise NotImplementedError("Encoding supplements are not yet supported") fmt = fmt & 0x7f if fmt == 0: encoding = parseEncoding0(parent.charset, file, haveSupplement, parent.strings) elif fmt == 1: encoding = parseEncoding1(parent.charset, file, haveSupplement, parent.strings) return encoding
def main(args=None): from fontTools import configLogger if args is None: args = sys.argv[1:] options = Options() args = options.parse_opts(args) if len(args) < 1: print("usage: pyftmerge font...", file=sys.stderr) return 1 configLogger(level=logging.INFO if options.verbose else logging.WARNING) if options.timing: timer.logger.setLevel(logging.DEBUG) else: timer.logger.disabled = True merger = Merger(options=options) font = merger.merge(args) outfile = 'merged.ttf' with timer("compile and save font"): font.save(outfile)
def get_org_id(org_name): """ Return the Organisation ID for a given Org Name """ # Check if our organization exists, and extract its ID org = get_json(SAT_API + "organizations/" + org_name) # If the requested organization is not found, exit if org.get('error', None): msg = "Organization '%s' does not exist." % org_name log_msg(msg, 'ERROR') sys.exit(1) else: # Our organization exists, so let's grab the ID and write some debug org_id = org['id'] msg = "Organisation '" + org_name + "' found with ID " + str(org['id']) log_msg(msg, 'DEBUG') return org_id
def log_msg(msg, level): """Write message to logfile""" # If we are NOT in debug mode, only write non-debug messages to the log if level == 'DEBUG': if DEBUG: logging.debug(msg) print BOLD + "DEBUG: " + msg + ENDC elif level == 'ERROR': logging.error(msg) tf.write('ERROR:' + msg + '\n') print ERROR + "ERROR: " + msg + ENDC elif level == 'WARNING': logging.warning(msg) tf.write('WARNING:' + msg + '\n') print WARNING + "WARNING: " + msg + ENDC # Otherwise if we ARE in debug, write everything to the log AND stdout else: logging.info(msg) tf.write(msg + '\n')
def SendAndAccount(self, binary_data): # Keep this check! if self._logger.isEnabledFor(logging.DEBUG): logging.debug("!! Sending BIN data: {0}".format(binascii.hexlify(binary_data))) datalen = len(binary_data) if self._is_udp: self._swarm.SendData(self.ip_address, self.udp_port, binary_data) else: # Prevent crashes when TCP connection is already removed, but some sending is still pending if self._proto is not None: self._proto.send_data(binary_data) self._swarm._all_data_tx += datalen else: return # No need to increase sent data counter... self._total_data_tx += datalen
def HandleRequest(self, msg_request): """Handle incomming REQUEST message""" for x in range(msg_request.start_chunk, msg_request.end_chunk + 1): # Ignore requests for discarded chunks if x <= self._swarm._last_discarded_id: continue self.set_requested.add(x) # TODO: We might want a more intelligent ACK mechanism than this, but this works well for now self.set_sent.discard(x) if self._logger.isEnabledFor(logging.DEBUG): logging.debug("FROM > {0} > REQUEST: {1}".format(self._peer_num, msg_request)) # Try to send some data if self._sending_handle == None: self._sending_handle = asyncio.get_event_loop().call_soon(self.SendRequestedChunks)
def set_debug(self, state): "Turn debugging on or off, remembering the last-set level" if state == self.forced_debug: self.debug("Debug signal ignored; already %sdebugging", "" if state else "not ") if state: self.level(DEBUG, save=False) self.debug("Debug started") else: self.debug("Debug discontinued") self.level(self.last_level) self.forced_debug = state self.__update_env()
def main(): print("see log scrape.log") if os.path.isfile("scrape.log"): os.remove("scrape.log") log.basicConfig(filename="scrape.log", format='%(asctime)s %(levelname)s %(message)s', level=log.DEBUG) try: log.debug("main() full scrape will take 5-10 minutes") cards, tokens = loadJsonCards() saveCardsAsJson("data/cards.json", loadSets(allcards=cards)) # a lot of token names are not unique # a static, handmade list of ids is more reliable if os.path.isfile('data/tokenlist.json'): with open('data/tokenlist.json', 'r', encoding='utf8') as f: saveCardsAsJson("data/tokens.json", loadTokens(tokens, json.load(f))) except Exception as e: log.exception("main() error %s", e)
def __init__(self, debug=False, logfile=None): logging.Logger.__init__(self, 'VirtualBMC') try: if logfile is not None: self.handler = logging.FileHandler(logfile) else: self.handler = logging.StreamHandler() formatter = logging.Formatter(DEFAULT_LOG_FORMAT) self.handler.setFormatter(formatter) self.addHandler(self.handler) if debug: self.setLevel(logging.DEBUG) else: self.setLevel(logging.INFO) except IOError, e: if e.errno == errno.EACCES: pass
def __init__(self, debug=False): """ Constructor of the Application. :param debug: Sets the logging level of the application :raises NotImplementedError: When ``Application.base_title`` not set in the class definition. """ self.debug = debug loglevel = logging.DEBUG if debug else logging.WARNING logging.basicConfig( format='%(asctime)s - [%(levelname)s] %(message)s', datefmt='%I:%M:%S %p', level=loglevel) self.processor = EventProcessor() self.server = EventServer(processor=self.processor) if self.base_title is None: raise NotImplementedError self.services = {} self.views = {} self.current_view = None self.register('init', lambda evt, interface: self._load_view('default'))
def configure_logging(debug): '''Sets the data kennel logger to appropriate levels of chattiness.''' default_logger = logging.getLogger('') datadog_logger = logging.getLogger('datadog.api') requests_logger = logging.getLogger('requests') if debug: default_logger.setLevel(logging.DEBUG) datadog_logger.setLevel(logging.INFO) requests_logger.setLevel(logging.INFO) else: default_logger.setLevel(logging.INFO) datadog_logger.setLevel(logging.WARNING) requests_logger.setLevel(logging.WARNING) stream_handler = logging.StreamHandler(sys.__stdout__) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')) default_logger.addHandler(stream_handler)
def run_gracefully(main_function): """ Run a "main" function with standardized exception trapping, to make it easy to avoid certain unnecessary stack traces. If debug logging is switched on, stack traces will return. """ try: main_function() except EasyExit as msg: logger.error(str(msg)) sys.exit(1) except KeyboardInterrupt: # swallow the exception unless we turned on debugging, in which case # we might want to know what infinite loop we were stuck in if logging.getLogger().isEnabledFor(logging.DEBUG): raise sys.exit(1)
def init_logger(model): """ Initialize the logger. """ logger = logging.getLogger('cumulusci') # Remove existing handlers for handler in list(logger.handlers): handler.stream.flush(force=True) logger.removeHandler(handler) # Create the custom handler formatter = coloredlogs.ColoredFormatter(fmt='%(asctime)s: %(message)s') handler = LogHandler(model) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.DEBUG) logger.propagate = False return logger
def main(): import argparse logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler(sys.stdout)) parser = argparse.ArgumentParser(description="Linux distro info tool") parser.add_argument( '--json', '-j', help="Output in machine readable format", action="store_true") args = parser.parse_args() if args.json: logger.info(json.dumps(info(), indent=4, sort_keys=True)) else: logger.info('Name: %s', name(pretty=True)) distribution_version = version(pretty=True) logger.info('Version: %s', distribution_version) distribution_codename = codename() logger.info('Codename: %s', distribution_codename)