我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.INFO。
def create_logger(): """ Setup the logging environment """ log = logging.getLogger() # root logger log.setLevel(logging.INFO) format_str = '%(asctime)s - %(levelname)-8s - %(message)s' date_format = '%Y-%m-%d %H:%M:%S' if HAVE_COLORLOG and os.isatty(2): cformat = '%(log_color)s' + format_str colors = {'DEBUG': 'reset', 'INFO': 'reset', 'WARNING': 'bold_yellow', 'ERROR': 'bold_red', 'CRITICAL': 'bold_red'} formatter = colorlog.ColoredFormatter(cformat, date_format, log_colors=colors) else: formatter = logging.Formatter(format_str, date_format) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) log.addHandler(stream_handler) return logging.getLogger(__name__)
def init_logger(self, args): level = logging.INFO if args.verbose: level = logging.VERBOSE if args.debug: level = logging.DEBUG logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=level) Rthandler = RotatingFileHandler('arbitrage.log', maxBytes=100*1024*1024,backupCount=10) Rthandler.setLevel(level) formatter = logging.Formatter('%(asctime)-12s [%(levelname)s] %(message)s') Rthandler.setFormatter(formatter) logging.getLogger('').addHandler(Rthandler) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
def setup_logging(log_level=logging.INFO): """Set up the logging.""" logging.basicConfig(level=log_level) fmt = ("%(asctime)s %(levelname)s (%(threadName)s) " "[%(name)s] %(message)s") colorfmt = "%(log_color)s{}%(reset)s".format(fmt) datefmt = '%Y-%m-%d %H:%M:%S' # Suppress overly verbose logs from libraries that aren't helpful logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('aiohttp.access').setLevel(logging.WARNING) try: from colorlog import ColoredFormatter logging.getLogger().handlers[0].setFormatter(ColoredFormatter( colorfmt, datefmt=datefmt, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red', } )) except ImportError: pass logger = logging.getLogger('') logger.setLevel(log_level)
def setup_logging(verbose=0, colors=False, name=None): """Configure console logging. Info and below go to stdout, others go to stderr. :param int verbose: Verbosity level. > 0 print debug statements. > 1 passed to sphinx-build. :param bool colors: Print color text in non-verbose mode. :param str name: Which logger name to set handlers to. Used for testing. """ root_logger = logging.getLogger(name) root_logger.setLevel(logging.DEBUG if verbose > 0 else logging.INFO) formatter = ColorFormatter(verbose > 0, colors) if colors: colorclass.Windows.enable() handler_stdout = logging.StreamHandler(sys.stdout) handler_stdout.setFormatter(formatter) handler_stdout.setLevel(logging.DEBUG) handler_stdout.addFilter(type('', (logging.Filter,), {'filter': staticmethod(lambda r: r.levelno <= logging.INFO)})) root_logger.addHandler(handler_stdout) handler_stderr = logging.StreamHandler(sys.stderr) handler_stderr.setFormatter(formatter) handler_stderr.setLevel(logging.WARNING) root_logger.addHandler(handler_stderr)
def __init__(self, queue, DEBUG=config.DEBUG, reset=False, socksport=None): if not socksport: socksport = config.SOCKS_PORT ## TODO add checks that a socks proxy is even open ## TODO add Tor checks to make sure circuits are operating threading.Thread.__init__(self) self.reset = reset # Whether to check if a url has been collected self.queue = queue # Multithreading queue of urls self.proxysettings = [ '--proxy=127.0.0.1:%s' % socksport, '--proxy-type=socks5', ] #self.proxysettings = [] # DEBUG #self.ignore_ssl = ['--ignore-ssl-errors=true', '--ssl-protocols=any'] self.ignore_ssl = [] self.service_args = self.proxysettings + self.ignore_ssl self.failcount = 0 # Counts failures self.donecount = 0 # Counts successes self.tor = tor.tor() # Manages Tor via control port if DEBUG: # PhantomJS sends a lot of data if debug set to DEBUG logging.basicConfig(level=logging.INFO)
def init_logging(logfile): formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s', datefmt='%m/%d/%Y %H:%M:%S' ) fh = logging.FileHandler(logfile) # ch = logging.StreamHandler() fh.setFormatter(formatter) # ch.setFormatter(formatter) # fh.setLevel(logging.INFO) # ch.setLevel(logging.INFO) # logging.getLogger().addHandler(ch) logging.getLogger().addHandler(fh) logging.getLogger().setLevel(logging.INFO) return logging # prepare logging.
def init_logger(logger_name): # initialize logger log = logging.getLogger(logger_name) _h = logging.FileHandler('%s/%s' % ( cfg.CONF.service.service_log_path, cfg.CONF.service.service_log_filename)) _h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" "%(lineno)s - %(levelname)s" " - %(message)s'")) log.addHandler(_h) if cfg.CONF.service.enable_debug_log_entries: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO) return log
def hidden_cursor(file): # The Windows terminal does not support the hide/show cursor ANSI codes, # even via colorama. So don't even try. if WINDOWS: yield # We don't want to clutter the output with control characters if we're # writing to a file, or if the user is running with --quiet. # See https://github.com/pypa/pip/issues/3418 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: yield else: file.write(HIDE_CURSOR) try: yield finally: file.write(SHOW_CURSOR)
def open_spinner(message): # Interactive spinner goes directly to sys.stdout rather than being routed # through the logging system, but it acts like it has level INFO, # i.e. it's only displayed if we're at level INFO or better. # Non-interactive spinner goes through the logging system, so it is always # in sync with logging configuration. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: spinner = InteractiveSpinner(message) else: spinner = NonInteractiveSpinner(message) try: with hidden_cursor(sys.stdout): yield spinner except KeyboardInterrupt: spinner.finish("canceled") raise except Exception: spinner.finish("error") raise else: spinner.finish("done")
def main(): args = get_args() logging.basicConfig( format='%(asctime)s %(message)s', filename=os.path.join(args.outdir, "NanoQC.log"), level=logging.INFO) logging.info("NanoQC started.") sizeRange = length_histogram( fqin=gzip.open(args.fastq, 'rt'), name=os.path.join(args.outdir, "SequenceLengthDistribution.png")) fq = get_bin(gzip.open(args.fastq, 'rt'), sizeRange) logging.info("Using {} reads for plotting".format(len(fq))) fqbin = [dat[0] for dat in fq] qualbin = [dat[1] for dat in fq] logging.info("Creating plots...") per_base_sequence_content_and_quality(fqbin, qualbin, args.outdir, args.format) logging.info("per base sequence content and quality completed.") logging.info("Finished!")
def init_logging(logfile, debug=True, level=None): """ Simple configuration of logging. """ if debug: log_level = logging.DEBUG else: log_level = logging.INFO # allow user to override exact log_level if level: log_level = level logging.basicConfig(level=log_level, format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s', filename=logfile, filemode='a') return logging.getLogger("circus")
def main(): # Set up a console logger. console = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s %(name)-12s:%(levelname)-8s: %(message)s") console.setFormatter(formatter) logging.getLogger().addHandler(console) logging.getLogger().setLevel(logging.INFO) kw = {} longopts = ['domainname=', 'verbose'] opts, args = getopt.getopt(sys.argv[1:], 'v', longopts) for opt, val in opts: if opt == '--domainname': kw['domainName'] = val if opt in ['-v', '--verbose']: kw['verbose'] = True a = QApplication(sys.argv) QObject.connect(a,SIGNAL("lastWindowClosed()"),a,SLOT("quit()")) w = BrowseWindow(**kw) w.show() a.exec_()
def ConvertLog4ToCFLevel( log4level ): if log4level == logging.FATAL+1 : return CF.LogLevels.OFF if log4level == logging.FATAL : return CF.LogLevels.FATAL if log4level == logging.ERROR : return CF.LogLevels.ERROR if log4level == logging.WARN : return CF.LogLevels.WARN if log4level == logging.INFO : return CF.LogLevels.INFO if log4level == logging.DEBUG : return CF.LogLevels.DEBUG if log4level == logging.TRACE : return CF.LogLevels.TRACE if log4level == logging.NOTSET: return CF.LogLevels.ALL return CF.LogLevels.INFO
def ConvertToLog4Level( newLevel ): level = logging.INFO if newLevel == CF.LogLevels.OFF : level=logging.FATAL+1 if newLevel == CF.LogLevels.FATAL : level=logging.FATAL if newLevel == CF.LogLevels.ERROR : level=logging.ERROR if newLevel == CF.LogLevels.WARN : level=logging.WARN if newLevel == CF.LogLevels.INFO: level=logging.INFO if newLevel == CF.LogLevels.DEBUG: level=logging.DEBUG if newLevel == CF.LogLevels.TRACE: level=logging.TRACE if newLevel == CF.LogLevels.ALL: level=logging.TRACE return level
def __init__(self, resource=None ): self._mgr_lock = threading.Lock() self._ecm = None self._logger = logging.getLogger("ossie.events.Manager") self._logger.setLevel(logging.INFO) self._allow = True self._registrations=[] if resource : try: self._logger.debug("Requesting Domain Manager Access....") dom = resource.getDomainManager() self._logger.debug("Requesting EventChannelManager Access....") self._ecm = dom.getRef()._get_eventChannelMgr() self._logger.debug("Acquired reference to EventChannelManager") except: #print traceback.format_exc() self._logger.warn("EventChannelManager - unable to resolve DomainManager's EventChannelManager ") pass
def run(self, args=None, namespace=None): options = self.parser.parse_args(args=args, namespace=namespace) enable_pretty_logging() logger = logging.getLogger(__name__) # todo configure_logger() method ? if options.debug: logging.getLogger('root').setLevel(logging.INFO) if options.verbose: if options.verbose >= 1: logging.getLogger('root').setLevel(logging.DEBUG) if options.verbose >= 2: logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO if options.verbose < 2 else logging.DEBUG) try: handler = options.handler except AttributeError as e: if not callable(self.default_handler): raise handler = None return (handler or self.default_handler)(logger, options)
def setup(name=__name__, level=logging.INFO): logger = logging.getLogger(name) if logger.handlers: return logger logger.setLevel(level) try: # check if click exists to swap the logger import click # noqa formatter = ColorFormatter('[.] %(message)s') except ImportError: formatter = CustomFormatter('[.] %(message)s') handler = logging.StreamHandler(None) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger
def __init__(self, appname, dllname=None, logtype="Application"): logging.Handler.__init__(self) try: import win32evtlogutil, win32evtlog self.appname = appname self._welu = win32evtlogutil if not dllname: dllname = os.path.split(self._welu.__file__) dllname = os.path.split(dllname[0]) dllname = os.path.join(dllname[0], r'win32service.pyd') self.dllname = dllname self.logtype = logtype self._welu.AddSourceToRegistry(appname, dllname, logtype) self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE self.typemap = { logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE, logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE, logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE, logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE, logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE, } except ImportError: print("The Python Win32 extensions for NT (service, event "\ "logging) appear not to be available.") self._welu = None
def main(): parser = build_cli_parser("Grab all binaries from a Cb server") parser.add_argument('-d', '--destdir', action='store', help='Destination directory to place the events', default=os.curdir) # TODO: we don't have a control on the "start" value in the query yet # parser.add_argument('--start', action='store', dest='startvalue', help='Start from result number', default=0) parser.add_argument('-v', action='store_true', dest='verbose', help='Enable verbose debugging messages', default=False) args = parser.parse_args() cb = get_cb_response_object(args) if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) # startvalue = args.startvalue startvalue = 0 return dump_all_binaries(cb, args.destdir, startvalue)
def __init__(self, session, api_key, service_key): """ :param session: The Flask requests object used to connect to PD :param api_key: The PD read-only, V2 API key :param service_key: The PD service name which is interrogated """ self._api_key = api_key self._service_key = service_key self.timezone = 'UTC' logging.basicConfig(level=logging.INFO) self._s = session self._headers = { 'Accept': 'application/vnd.pagerduty+json;version=2', 'Authorization': 'Token token=' + self._api_key } self._s.headers.update(self._headers)
def instantiate(p): print("*** instantiate ***") print(p) with rlock: global logger logger = logging.getLogger("freepydius-logger") logger.setLevel(logging.INFO) handler = TimedRotatingFileHandler(_LOG_FILE, when="midnight", interval=1) formatter = logging.Formatter("%(asctime)s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) log = Log("INSTANCE") log.log(( ('Response', 'created'), )) # return 0 for success or -1 for failure return 0
def __init__(self, apply_light_policy_interval = 10, device_detection_interval = 10, device_offline_delay = 10, logging_level = logging.INFO): self.__yeelight_detection_thread = None self.__device_detection_thread = None self.__device_detection_thread_woker = {} self.__device_detection_thread_rlock = threading.Lock() self.__thread_rlock = threading.Lock() self.__apply_light_policy_thread = None self.__current_geo = None self.__compiled_policy = [] self.__compiled_policy_date = None self.__device_on_monitor = [] self.__device_online = [] self.__device_detection_interval = device_detection_interval self.__apply_light_policy_interval = apply_light_policy_interval self.__device_offline_delay = device_offline_delay self.__config = {} self.__RUNNING = False # a few setups self.register_signal_handler() self.__setup_log(logging_level = logging_level) self.__logger.info("Controller instance created")
def cli(ctx, registry, build_container_image, build_container_tag, build_container_net, verbose): """ Easily dockerize your Git repository """ logging_level = logging.DEBUG if verbose else logging.INFO utils.configure_logging(name='skipper', level=logging_level) ctx.obj['registry'] = registry ctx.obj['build_container_image'] = build_container_image ctx.obj['build_container_net'] = build_container_net ctx.obj['git_revision'] = build_container_tag == 'git:revision' ctx.obj['build_container_tag'] = git.get_hash() if ctx.obj['git_revision'] else build_container_tag ctx.obj['env'] = ctx.default_map.get('env', {}) ctx.obj['containers'] = ctx.default_map.get('containers') ctx.obj['volumes'] = ctx.default_map.get('volumes') ctx.obj['workdir'] = ctx.default_map.get('workdir') ctx.obj['container_context'] = ctx.default_map.get('container_context')
def __init__(self, args, logger=None, mode=None): self.commandLine = CommandLine(args) if mode is not None: self.commandLine.mode = mode if logger is None: logging.basicConfig(format="%(asctime)-15s %(levelname)s [%(filename)s:%(lineno)d-%(thread)d] %(message)s") logger = logging.getLogger() logger.setLevel(logging.INFO) self.logger = logger self.mode = self.__detectMode(self.commandLine.mode) self.config = self.__loadConfig() self.inventory = Inventory(logger, self.config) self.commands = { 'migrate': MigrateCommand(self), 'list-migrations': ListMigrationsCommand(self), 'version': VersionCommand(self), 'help':HelpCommand(self) } self.defaultCommand = HelpCommand(self)
def log(self, message, level=logging.DEBUG, depth=0): """Prepend string to log messages to denote class.""" if depth <= 0: prefix = 'AmazonAccountUtils: ' else: prefix = "\t" * depth if level == CRITICAL: self.logger.critical(prefix + str(message)) elif level == ERROR: self.logger.error(prefix + str(message)) elif level == WARNING: self.logger.warning(prefix + str(message)) elif level == INFO: self.logger.info(prefix + str(message)) else: self.logger.debug(prefix + str(message))
def test_missing_variable(self): """Test if ``WriteTensorBoard`` handles missing image variables as expected.""" bad_epoch_data = {'valid': {}} with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}): # test ignore hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'], on_missing_variable='ignore') with LogCapture(level=logging.INFO) as log_capture: hook.after_epoch(42, bad_epoch_data) log_capture.check() # test warn warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'], on_missing_variable='warn') with LogCapture(level=logging.INFO) as log_capture2: warn_hook.after_epoch(42, bad_epoch_data) log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.')) # test error raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'], on_missing_variable='error') with self.assertRaises(KeyError): raise_hook.after_epoch(42, bad_epoch_data)
def main(args=None): from fontTools import configLogger if args is None: args = sys.argv[1:] options = Options() args = options.parse_opts(args) if len(args) < 1: print("usage: pyftmerge font...", file=sys.stderr) return 1 configLogger(level=logging.INFO if options.verbose else logging.WARNING) if options.timing: timer.logger.setLevel(logging.DEBUG) else: timer.logger.disabled = True merger = Merger(options=options) font = merger.merge(args) outfile = 'merged.ttf' with timer("compile and save font"): font.save(outfile)
def main(): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') logging.info("LEDBAT TEST SINK starting") loop = asyncio.get_event_loop() listen = loop.create_datagram_endpoint(PeerProtocol, local_addr=("0.0.0.0", 6778)) transport, protocol = loop.run_until_complete(listen) if os.name == 'nt': def wakeup(): # Call again later loop.call_later(0.5, wakeup) loop.call_later(0.5, wakeup) try: loop.run_forever() except KeyboardInterrupt: pass
def main(args): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') logging.info("LEDBAT TEST SOURCE starting. Target: {}".format(args.target_ip)) loop = asyncio.get_event_loop() listen = loop.create_datagram_endpoint(lambda: PeerProtocol(args), local_addr=("0.0.0.0", 6778)) transport, protocol = loop.run_until_complete(listen) if os.name == 'nt': def wakeup(): # Call again later loop.call_later(0.5, wakeup) loop.call_later(0.5, wakeup) try: loop.run_forever() except KeyboardInterrupt: pass
def setup(args, pipeline, runmod, injector): """Load configuration""" logging.basicConfig( format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') _globals['gransk'] = gransk.api.API(injector) _globals['config'] = _globals['gransk'].config if pipeline: _globals['gransk'].pipeline = pipeline if _globals['gransk'].pipeline.get_service('related_entities'): _globals['gransk'].pipeline.get_service('related_entities').load_all(_globals['config']) if _globals['gransk'].pipeline.get_service('related_documents'): _globals['gransk'].pipeline.get_service('related_documents').load_all(_globals['config'])
def __init__(self, debug=False, logfile=None): logging.Logger.__init__(self, 'VirtualBMC') try: if logfile is not None: self.handler = logging.FileHandler(logfile) else: self.handler = logging.StreamHandler() formatter = logging.Formatter(DEFAULT_LOG_FORMAT) self.handler.setFormatter(formatter) self.addHandler(self.handler) if debug: self.setLevel(logging.DEBUG) else: self.setLevel(logging.INFO) except IOError, e: if e.errno == errno.EACCES: pass
def configure_logging(debug): '''Sets the data kennel logger to appropriate levels of chattiness.''' default_logger = logging.getLogger('') datadog_logger = logging.getLogger('datadog.api') requests_logger = logging.getLogger('requests') if debug: default_logger.setLevel(logging.DEBUG) datadog_logger.setLevel(logging.INFO) requests_logger.setLevel(logging.INFO) else: default_logger.setLevel(logging.INFO) datadog_logger.setLevel(logging.WARNING) requests_logger.setLevel(logging.WARNING) stream_handler = logging.StreamHandler(sys.__stdout__) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')) default_logger.addHandler(stream_handler)
def parse_args(): """ Parse the command line arguments """ parser = argparse.ArgumentParser( description="Integrate Hugo and PhotoSwipe") parser.add_argument('-v', '--verbose', help="Verbose mode", action="store_const", dest="loglevel", const=logging.INFO, default=logging.WARNING) parser.add_argument('-f', '--fast', action="store_true", help=('Fast mode ' '(tries less potential crops)')) parser.add_argument('command', choices=['new', 'update', 'clean', 'init'], help="action to do") parser.add_argument('album', nargs='?', help="album to apply the action to") args = parser.parse_args() logging.basicConfig(level=args.loglevel, datefmt="[%Y-%m-%d %H:%M:%S]", format="%(asctime)s - %(message)s") settings.verbose = args.loglevel == logging.INFO settings.fast = args.fast return args.command, args.album
def getLogger(self): ''' Initialize and load log handlers ''' logger = logging.getLogger(self.proc_name) logger.setLevel(logging.INFO) if "debug" in self.config['logging']: if self.config['logging']['debug']: logger.setLevel(logging.DEBUG) # Load and add a handler for each logging mechanism for loghandler in self.config['logging']['plugins'].keys(): plugin = __import__("plugins.logging." + loghandler, globals(), locals(), ['Logger'], -1) lh = plugin.Logger(config=self.config, proc_name=self.proc_name) logger.addHandler(lh.setup()) return logger
def main(argv=None): args = parse_arguments(argv) if args['very_verbose']: logging.basicConfig(level=logging.DEBUG) elif args['verbose']: logging.basicConfig(level=logging.INFO) else: logging.basicConfig() del args['verbose'] del args['very_verbose'] sc = SparkContext(appName="MLR: data collection pipeline") # spark info logging is incredibly spammy. Use warn to have some hope of # human decipherable output sc.setLogLevel('WARN') sqlContext = HiveContext(sc) run_pipeline(sc, sqlContext, **args)
def parse_arguments(argv): parser = argparse.ArgumentParser(description='...') parser.add_argument( '-b', '--brokers', dest='brokers', required=True, type=lambda x: x.split(','), help='Kafka brokers to bootstrap from as a comma separated list of <host>:<port>') parser.add_argument( '-m', '--max-request-size', dest='max_request_size', type=int, default=4*1024*1024*10, help='Max size of requets sent to the kafka broker' + 'Defaults to 40MB.') parser.add_argument( '-w', '--num-workers', dest='n_workers', type=int, default=5, help='Number of workers to issue elasticsearch queries in parallel. ' + 'Defaults to 5.') parser.add_argument( '-v', '--verbose', dest='verbose', default=False, action='store_true', help='Increase logging to INFO') parser.add_argument( '-vv', '--very-verbose', dest='very_verbose', default=False, action='store_true', help='Increase logging to DEBUG') args = parser.parse_args(argv) return dict(vars(args))
def __init__(self, ip_address, user, password=None, key_filename=None): self.ip_address = ip_address self.user = user self.password = password self.key_filename = key_filename self.connected = False self.shell = None self.logger.setLevel(logging.INFO) self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.util = Utilvnf() with open(self.util.test_env_config_yaml) as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() self.ssh_revieve_buff = test_env_config_yaml.get("general").get( "ssh_receive_buffer")
def filter(self, rec): """ filter a record, adding the colors automatically * error: red * warning: yellow :param rec: message to record """ rec.zone = rec.module if rec.levelno >= logging.INFO: return True m = re_log.match(rec.msg) if m: rec.zone = m.group(1) rec.msg = m.group(2) if zones: return getattr(rec, 'zone', '') in zones or '*' in zones elif not verbose > 2: return False return True
def main(): args = parse_args() logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO)) # Don't allow more than 10 concurrent requests to the wayback machine concurrency = min(args.concurrency, 10) # Scrape results are stored in a temporary folder if no folder specified target_folder = args.target_folder if args.target_folder else tempfile.gettempdir() logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder)) # Parse the period entered by the user (throws an exception if the dates are not correctly formatted) from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT) to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT) # The scraper downloads the elements matching the given xpath expression in the target folder scraper = Scraper(target_folder, args.xpath) # Launch the scraping using the scraper previously instantiated scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta), concurrency)